Ported most examples

- Renamed ChannelBootstrap to Bootstrap
- Renamed ServerChannelBootstrap to ServerBootstrap
- Moved bootstrap classes to io.netty.bootstrap as before
- Moved unfoldAndAdd() to a separate utility class
- Fixed a bug in unfoldAndAdd() where it did not handle ChannelBuffer
  correctly
This commit is contained in:
Trustin Lee 2012-05-29 16:41:26 -07:00
parent b10cf29393
commit 8237afff64
69 changed files with 1005 additions and 1309 deletions

View File

@ -16,16 +16,16 @@
package io.netty.handler.codec.spdy;
import static org.junit.Assert.*;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.Channel;
import io.netty.channel.ChannelBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.channel.ChannelInboundStreamHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ServerChannelBootstrap;
import java.io.IOException;
import java.net.InetAddress;
@ -127,8 +127,8 @@ public abstract class AbstractSocketSpdyEchoTest {
frames.writeInt(random.nextInt() & 0x7FFFFFFF);
}
private ServerChannelBootstrap sb;
private ChannelBootstrap cb;
private ServerBootstrap sb;
private Bootstrap cb;
@Before
public void initBootstrap() {
@ -142,8 +142,8 @@ public abstract class AbstractSocketSpdyEchoTest {
cb.shutdown();
}
protected abstract ServerChannelBootstrap newServerBootstrap();
protected abstract ChannelBootstrap newClientBootstrap();
protected abstract ServerBootstrap newServerBootstrap();
protected abstract Bootstrap newClientBootstrap();
@Test(timeout = 10000)
public void testSpdyEcho() throws Throwable {

View File

@ -16,8 +16,8 @@
package io.netty.handler.codec.spdy;
import io.netty.channel.ChannelBootstrap;
import io.netty.channel.ServerChannelBootstrap;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.channel.socket.nio.NioEventLoop;
@ -25,15 +25,15 @@ import io.netty.channel.socket.nio.NioEventLoop;
public class NioNioSocketSpdyEchoTest extends AbstractSocketSpdyEchoTest {
@Override
protected ChannelBootstrap newClientBootstrap() {
return new ChannelBootstrap()
protected Bootstrap newClientBootstrap() {
return new Bootstrap()
.eventLoop(new NioEventLoop())
.channel(new NioSocketChannel());
}
@Override
protected ServerChannelBootstrap newServerBootstrap() {
return new ServerChannelBootstrap()
protected ServerBootstrap newServerBootstrap() {
return new ServerBootstrap()
.eventLoop(new NioEventLoop(), new NioEventLoop())
.channel(new NioServerSocketChannel());
}

View File

@ -16,8 +16,8 @@
package io.netty.handler.codec.spdy;
import io.netty.channel.ChannelBootstrap;
import io.netty.channel.ServerChannelBootstrap;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.channel.socket.nio.NioEventLoop;
import io.netty.channel.socket.oio.OioEventLoop;
@ -26,15 +26,15 @@ import io.netty.channel.socket.oio.OioServerSocketChannel;
public class NioOioSocketSpdyEchoTest extends AbstractSocketSpdyEchoTest {
@Override
protected ChannelBootstrap newClientBootstrap() {
return new ChannelBootstrap()
protected Bootstrap newClientBootstrap() {
return new Bootstrap()
.eventLoop(new NioEventLoop())
.channel(new NioSocketChannel());
}
@Override
protected ServerChannelBootstrap newServerBootstrap() {
return new ServerChannelBootstrap()
protected ServerBootstrap newServerBootstrap() {
return new ServerBootstrap()
.eventLoop(new OioEventLoop(), new OioEventLoop())
.channel(new OioServerSocketChannel());
}

View File

@ -16,8 +16,8 @@
package io.netty.handler.codec.spdy;
import io.netty.channel.ChannelBootstrap;
import io.netty.channel.ServerChannelBootstrap;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioEventLoop;
import io.netty.channel.socket.oio.OioEventLoop;
@ -26,15 +26,15 @@ import io.netty.channel.socket.oio.OioSocketChannel;
public class OioNioSocketSpdyEchoTest extends AbstractSocketSpdyEchoTest {
@Override
protected ChannelBootstrap newClientBootstrap() {
return new ChannelBootstrap()
protected Bootstrap newClientBootstrap() {
return new Bootstrap()
.eventLoop(new OioEventLoop())
.channel(new OioSocketChannel());
}
@Override
protected ServerChannelBootstrap newServerBootstrap() {
return new ServerChannelBootstrap()
protected ServerBootstrap newServerBootstrap() {
return new ServerBootstrap()
.eventLoop(new NioEventLoop(), new NioEventLoop())
.channel(new NioServerSocketChannel());
}

View File

@ -16,8 +16,8 @@
package io.netty.handler.codec.spdy;
import io.netty.channel.ChannelBootstrap;
import io.netty.channel.ServerChannelBootstrap;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.socket.oio.OioEventLoop;
import io.netty.channel.socket.oio.OioServerSocketChannel;
import io.netty.channel.socket.oio.OioSocketChannel;
@ -25,15 +25,15 @@ import io.netty.channel.socket.oio.OioSocketChannel;
public class OioOioSocketSpdyEchoTest extends AbstractSocketSpdyEchoTest {
@Override
protected ChannelBootstrap newClientBootstrap() {
return new ChannelBootstrap()
protected Bootstrap newClientBootstrap() {
return new Bootstrap()
.eventLoop(new OioEventLoop())
.channel(new OioSocketChannel());
}
@Override
protected ServerChannelBootstrap newServerBootstrap() {
return new ServerChannelBootstrap()
protected ServerBootstrap newServerBootstrap() {
return new ServerBootstrap()
.eventLoop(new OioEventLoop(), new OioEventLoop())
.channel(new OioServerSocketChannel());
}

View File

@ -0,0 +1,69 @@
package io.netty.handler.codec;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.ChannelHandlerContext;
import java.util.Queue;
class CodecUtil {
static boolean unfoldAndAdd(
ChannelHandlerContext ctx, Object msg, boolean inbound) throws Exception {
if (msg == null) {
return false;
}
// Note we only recognize Object[] because Iterable is often implemented by user messages.
if (msg instanceof Object[]) {
Object[] array = (Object[]) msg;
if (array.length == 0) {
return false;
}
boolean added = false;
for (Object m: array) {
if (m == null) {
break;
}
if (unfoldAndAdd(ctx, m, inbound)) {
added = true;
}
}
return added;
}
if (inbound) {
Queue<Object> dst = ctx.nextInboundMessageBuffer();
if (dst != null) {
dst.add(msg);
return true;
} else if (msg instanceof ChannelBuffer) {
ChannelBuffer altDst = ctx.nextInboundByteBuffer();
ChannelBuffer src = (ChannelBuffer) msg;
if (altDst != null) {
altDst.writeBytes(src, src.readerIndex(), src.readableBytes());
return true;
}
}
} else {
Queue<Object> dst = ctx.nextOutboundMessageBuffer();
if (dst != null) {
dst.add(msg);
return true;
} else if (msg instanceof ChannelBuffer) {
ChannelBuffer altDst = ctx.nextOutboundByteBuffer();
ChannelBuffer src = (ChannelBuffer) msg;
if (altDst != null) {
altDst.writeBytes(src, src.readerIndex(), src.readableBytes());
return true;
}
}
}
throw new IllegalStateException("no suitable destination buffer found");
}
private CodecUtil() {
// Unused
}
}

View File

@ -1,6 +1,5 @@
package io.netty.handler.codec;
import static io.netty.handler.codec.MessageToMessageEncoder.*;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelInboundHandlerAdapter;
@ -42,7 +41,7 @@ public abstract class MessageToMessageDecoder<I, O> extends ChannelInboundHandle
continue;
}
if (unfoldAndAdd(ctx, ctx.nextInboundMessageBuffer(), omsg)) {
if (CodecUtil.unfoldAndAdd(ctx, omsg, true)) {
notify = true;
}
} catch (Throwable t) {

View File

@ -3,7 +3,6 @@ package io.netty.handler.codec;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelOutboundHandlerContext;
@ -43,7 +42,7 @@ public abstract class MessageToMessageEncoder<I, O> extends ChannelOutboundHandl
continue;
}
if (unfoldAndAdd(ctx, ctx.nextOutboundMessageBuffer(), omsg)) {
if (CodecUtil.unfoldAndAdd(ctx, omsg, false)) {
notify = true;
}
} catch (Throwable t) {
@ -70,47 +69,4 @@ public abstract class MessageToMessageEncoder<I, O> extends ChannelOutboundHandl
}
public abstract O encode(ChannelOutboundHandlerContext<I> ctx, I msg) throws Exception;
static <T> boolean unfoldAndAdd(
ChannelHandlerContext ctx, Queue<Object> dst, Object msg) throws Exception {
if (msg == null) {
return false;
}
if (msg instanceof Object[]) {
Object[] array = (Object[]) msg;
if (array.length == 0) {
return false;
}
boolean added = false;
for (Object m: array) {
if (m == null) {
break;
}
if (unfoldAndAdd(ctx, dst, m)) {
added = true;
}
}
return added;
}
if (msg instanceof Iterable) {
boolean added = false;
@SuppressWarnings("unchecked")
Iterable<Object> i = (Iterable<Object>) msg;
for (Object m: i) {
if (m == null) {
break;
}
if (unfoldAndAdd(ctx, dst, m)) {
added = true;
}
}
return added;
}
dst.add(msg);
return true;
}
}

View File

@ -15,7 +15,6 @@
*/
package io.netty.handler.codec;
import static io.netty.handler.codec.MessageToMessageEncoder.*;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelBufferHolder;
@ -378,7 +377,7 @@ public abstract class ReplayingDecoder<O, S extends Enum<S>> extends StreamToMes
}
try {
if (unfoldAndAdd(ctx, ctx.nextInboundMessageBuffer(), decodeLast(ctx, replayable))) {
if (CodecUtil.unfoldAndAdd(ctx, decodeLast(ctx, replayable), true)) {
fireInboundBufferUpdated(ctx, in);
}
} catch (Signal replay) {
@ -442,7 +441,7 @@ public abstract class ReplayingDecoder<O, S extends Enum<S>> extends StreamToMes
}
// A successful decode
if (unfoldAndAdd(ctx, ctx.nextInboundMessageBuffer(), result)) {
if (CodecUtil.unfoldAndAdd(ctx, result, true)) {
decoded = true;
}
} catch (Throwable t) {

View File

@ -1,6 +1,5 @@
package io.netty.handler.codec;
import static io.netty.handler.codec.MessageToMessageEncoder.*;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
@ -33,7 +32,7 @@ public abstract class StreamToMessageDecoder<O> extends ChannelInboundHandlerAda
}
try {
if (unfoldAndAdd(ctx, ctx.nextInboundMessageBuffer(), decodeLast(ctx, in))) {
if (CodecUtil.unfoldAndAdd(ctx, decodeLast(ctx, in), true)) {
in.discardReadBytes();
ctx.fireInboundBufferUpdated();
}
@ -69,7 +68,7 @@ public abstract class StreamToMessageDecoder<O> extends ChannelInboundHandlerAda
}
}
if (unfoldAndAdd(ctx, ctx.nextInboundMessageBuffer(), o)) {
if (CodecUtil.unfoldAndAdd(ctx, o, true)) {
decoded = true;
} else {
break;

View File

@ -15,7 +15,7 @@
*/
package io.netty.example.discard;
import io.netty.channel.ChannelBootstrap;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
@ -38,7 +38,7 @@ public class DiscardClient {
}
public void run() throws Exception {
ChannelBootstrap b = new ChannelBootstrap();
Bootstrap b = new Bootstrap();
try {
b.eventLoop(new NioEventLoop())
.channel(new NioSocketChannel())

View File

@ -15,9 +15,9 @@
*/
package io.netty.example.discard;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ServerChannelBootstrap;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioEventLoop;
import io.netty.channel.socket.nio.NioServerSocketChannel;
@ -34,7 +34,7 @@ public class DiscardServer {
}
public void run() throws Exception {
ServerChannelBootstrap b = new ServerChannelBootstrap();
ServerBootstrap b = new ServerBootstrap();
try {
b.eventLoop(new NioEventLoop(), new NioEventLoop())
.channel(new NioServerSocketChannel())

View File

@ -15,7 +15,7 @@
*/
package io.netty.example.echo;
import io.netty.channel.ChannelBootstrap;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
@ -47,7 +47,7 @@ public class EchoClient {
public void run() throws Exception {
// Configure the client.
ChannelBootstrap b = new ChannelBootstrap();
Bootstrap b = new Bootstrap();
try {
b.eventLoop(new NioEventLoop())
.channel(new NioSocketChannel())

View File

@ -15,10 +15,10 @@
*/
package io.netty.example.echo;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ServerChannelBootstrap;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioEventLoop;
@ -41,7 +41,7 @@ public class EchoServer {
public void run() throws Exception {
// Configure the server.
ServerChannelBootstrap b = new ServerChannelBootstrap();
ServerBootstrap b = new ServerBootstrap();
try {
b.eventLoop(new NioEventLoop(), new NioEventLoop())
.channel(new NioServerSocketChannel())

View File

@ -15,7 +15,7 @@
*/
package io.netty.example.factorial;
import io.netty.channel.ChannelBootstrap;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.socket.nio.NioEventLoop;
import io.netty.channel.socket.nio.NioSocketChannel;
@ -37,7 +37,7 @@ public class FactorialClient {
}
public void run() throws Exception {
ChannelBootstrap b = new ChannelBootstrap();
Bootstrap b = new Bootstrap();
try {
b.eventLoop(new NioEventLoop())
.channel(new NioSocketChannel())

View File

@ -15,7 +15,7 @@
*/
package io.netty.example.factorial;
import io.netty.channel.ServerChannelBootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.socket.nio.NioEventLoop;
import io.netty.channel.socket.nio.NioServerSocketChannel;
@ -32,7 +32,7 @@ public class FactorialServer {
}
public void run() throws Exception {
ServerChannelBootstrap b = new ServerChannelBootstrap();
ServerBootstrap b = new ServerBootstrap();
try {
b.eventLoop(new NioEventLoop(), new NioEventLoop())
.channel(new NioServerSocketChannel())

View File

@ -15,11 +15,9 @@
*/
package io.netty.example.http.file;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.socket.nio.NioServerSocketChannelFactory;
import io.netty.channel.socket.nio.NioEventLoop;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class HttpStaticFileServer {
@ -29,20 +27,21 @@ public class HttpStaticFileServer {
this.port = port;
}
public void run() {
// Configure the server.
ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory(
Executors.newCachedThreadPool()));
public void run() throws Exception {
ServerBootstrap b = new ServerBootstrap();
try {
b.eventLoop(new NioEventLoop(), new NioEventLoop())
.channel(new NioServerSocketChannel())
.localAddress(port)
.childInitializer(new HttpStaticFileServerInitializer());
// Set up the event pipeline factory.
bootstrap.setPipelineFactory(new HttpStaticFileServerPipelineFactory());
// Bind and start to accept incoming connections.
bootstrap.bind(new InetSocketAddress(port));
b.bind().sync().channel().closeFuture().sync();
} finally {
b.shutdown();
}
}
public static void main(String[] args) {
public static void main(String[] args) throws Exception {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);

View File

@ -20,6 +20,20 @@ import static io.netty.handler.codec.http.HttpHeaders.Names.*;
import static io.netty.handler.codec.http.HttpMethod.*;
import static io.netty.handler.codec.http.HttpResponseStatus.*;
import static io.netty.handler.codec.http.HttpVersion.*;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.stream.ChunkedFile;
import io.netty.util.CharsetUtil;
import java.io.File;
import java.io.FileNotFoundException;
@ -35,27 +49,6 @@ import java.util.TimeZone;
import javax.activation.MimetypesFileTypeMap;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelFutureProgressListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.DefaultFileRegion;
import io.netty.channel.ExceptionEvent;
import io.netty.channel.FileRegion;
import io.netty.channel.MessageEvent;
import io.netty.channel.SimpleChannelUpstreamHandler;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.stream.ChunkedFile;
import io.netty.util.CharsetUtil;
/**
* A simple handler that serves incoming HTTP requests to send their respective
* HTTP responses. It also implements {@code 'If-Modified-Since'} header to
@ -102,15 +95,16 @@ import io.netty.util.CharsetUtil;
*
* </pre>
*/
public class HttpStaticFileServerHandler extends SimpleChannelUpstreamHandler {
public class HttpStaticFileServerHandler extends ChannelInboundMessageHandlerAdapter<HttpRequest> {
public static final String HTTP_DATE_FORMAT = "EEE, dd MMM yyyy HH:mm:ss zzz";
public static final String HTTP_DATE_GMT_TIMEZONE = "GMT";
public static final int HTTP_CACHE_SECONDS = 60;
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
HttpRequest request = (HttpRequest) e.getMessage();
public void messageReceived(
ChannelInboundHandlerContext<HttpRequest> ctx, HttpRequest request) throws Exception {
if (request.getMethod() != GET) {
sendError(ctx, METHOD_NOT_ALLOWED);
return;
@ -164,34 +158,11 @@ public class HttpStaticFileServerHandler extends SimpleChannelUpstreamHandler {
response.setHeader(CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
}
Channel ch = e.channel();
// Write the initial line and the header.
ch.write(response);
ctx.write(response);
// Write the content.
ChannelFuture writeFuture;
if (ch.pipeline().get(SslHandler.class) != null) {
// Cannot use zero-copy with HTTPS.
writeFuture = ch.write(new ChunkedFile(raf, 0, fileLength, 8192));
} else {
// No encryption - use zero-copy.
final FileRegion region =
new DefaultFileRegion(raf.getChannel(), 0, fileLength);
writeFuture = ch.write(region);
writeFuture.addListener(new ChannelFutureProgressListener() {
@Override
public void operationComplete(ChannelFuture future) {
region.releaseExternalResources();
}
@Override
public void operationProgressed(
ChannelFuture future, long amount, long current, long total) {
System.out.printf("%s: %d / %d (+%d)%n", path, current, total, amount);
}
});
}
ChannelFuture writeFuture = ctx.write(new ChunkedFile(raf, 0, fileLength, 8192));
// Decide whether to close the connection or not.
if (!isKeepAlive(request)) {
@ -201,22 +172,20 @@ public class HttpStaticFileServerHandler extends SimpleChannelUpstreamHandler {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
throws Exception {
Channel ch = e.channel();
Throwable cause = e.cause();
public void exceptionCaught(
ChannelInboundHandlerContext<HttpRequest> ctx, Throwable cause) throws Exception {
if (cause instanceof TooLongFrameException) {
sendError(ctx, BAD_REQUEST);
return;
}
cause.printStackTrace();
if (ch.isConnected()) {
if (ctx.channel().isActive()) {
sendError(ctx, INTERNAL_SERVER_ERROR);
}
}
private String sanitizeUri(String uri) {
private static String sanitizeUri(String uri) {
// Decode the path.
try {
uri = URLDecoder.decode(uri, "UTF-8");
@ -243,7 +212,7 @@ public class HttpStaticFileServerHandler extends SimpleChannelUpstreamHandler {
return System.getProperty("user.dir") + File.separator + uri;
}
private void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
response.setContent(ChannelBuffers.copiedBuffer(
@ -251,7 +220,7 @@ public class HttpStaticFileServerHandler extends SimpleChannelUpstreamHandler {
CharsetUtil.UTF_8));
// Close the connection as soon as the error message is sent.
ctx.channel().write(response).addListener(ChannelFutureListener.CLOSE);
ctx.write(response).addListener(ChannelFutureListener.CLOSE);
}
/**
@ -260,12 +229,12 @@ public class HttpStaticFileServerHandler extends SimpleChannelUpstreamHandler {
* @param ctx
* Context
*/
private void sendNotModified(ChannelHandlerContext ctx) {
private static void sendNotModified(ChannelHandlerContext ctx) {
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.NOT_MODIFIED);
setDateHeader(response);
// Close the connection as soon as the error message is sent.
ctx.channel().write(response).addListener(ChannelFutureListener.CLOSE);
ctx.write(response).addListener(ChannelFutureListener.CLOSE);
}
/**
@ -274,7 +243,7 @@ public class HttpStaticFileServerHandler extends SimpleChannelUpstreamHandler {
* @param response
* HTTP response
*/
private void setDateHeader(HttpResponse response) {
private static void setDateHeader(HttpResponse response) {
SimpleDateFormat dateFormatter = new SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US);
dateFormatter.setTimeZone(TimeZone.getTimeZone(HTTP_DATE_GMT_TIMEZONE));
@ -290,7 +259,7 @@ public class HttpStaticFileServerHandler extends SimpleChannelUpstreamHandler {
* @param fileToCache
* file to extract content type
*/
private void setDateAndCacheHeaders(HttpResponse response, File fileToCache) {
private static void setDateAndCacheHeaders(HttpResponse response, File fileToCache) {
SimpleDateFormat dateFormatter = new SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US);
dateFormatter.setTimeZone(TimeZone.getTimeZone(HTTP_DATE_GMT_TIMEZONE));
@ -313,7 +282,7 @@ public class HttpStaticFileServerHandler extends SimpleChannelUpstreamHandler {
* @param file
* file to extract content type
*/
private void setContentTypeHeader(HttpResponse response, File file) {
private static void setContentTypeHeader(HttpResponse response, File file) {
MimetypesFileTypeMap mimeTypesMap = new MimetypesFileTypeMap();
response.setHeader(HttpHeaders.Names.CONTENT_TYPE, mimeTypesMap.getContentType(file.getPath()));
}

View File

@ -15,20 +15,19 @@
*/
package io.netty.example.http.file;
import static io.netty.channel.Channels.*;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPipelineFactory;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpChunkAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.stream.ChunkedWriteHandler;
public class HttpStaticFileServerPipelineFactory implements ChannelPipelineFactory {
public class HttpStaticFileServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
public ChannelPipeline getPipeline() throws Exception {
public void initChannel(SocketChannel ch) throws Exception {
// Create a default pipeline implementation.
ChannelPipeline pipeline = pipeline();
ChannelPipeline pipeline = ch.pipeline();
// Uncomment the following line if you want HTTPS
//SSLEngine engine = SecureChatSslContextFactory.getServerContext().createSSLEngine();
@ -41,6 +40,5 @@ public class HttpStaticFileServerPipelineFactory implements ChannelPipelineFacto
pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
pipeline.addLast("handler", new HttpStaticFileServerHandler());
return pipeline;
}
}

View File

@ -15,8 +15,8 @@
*/
package io.netty.example.http.snoop;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelBootstrap;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.channel.socket.nio.NioEventLoop;
import io.netty.handler.codec.http.CookieEncoder;
@ -61,7 +61,7 @@ public class HttpSnoopClient {
boolean ssl = scheme.equalsIgnoreCase("https");
// Configure the client.
ChannelBootstrap b = new ChannelBootstrap();
Bootstrap b = new Bootstrap();
try {
b.eventLoop(new NioEventLoop())
.channel(new NioSocketChannel())

View File

@ -15,8 +15,8 @@
*/
package io.netty.example.http.snoop;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ServerChannelBootstrap;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioEventLoop;
@ -36,7 +36,7 @@ public class HttpSnoopServer {
public void run() throws Exception {
// Configure the server.
ServerChannelBootstrap b = new ServerChannelBootstrap();
ServerBootstrap b = new ServerBootstrap();
try {
b.eventLoop(new NioEventLoop(), new NioEventLoop())

View File

@ -15,11 +15,10 @@
*/
package io.netty.example.http.websocketx.autobahn;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.socket.nio.NioServerSocketChannelFactory;
import io.netty.channel.ChannelFuture;
import io.netty.channel.socket.nio.NioEventLoop;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* A Web Socket echo server for running the <a href="http://www.tavendo.de/autobahn/testsuite.html">autobahn</a> test
@ -33,22 +32,24 @@ public class AutobahnServer {
this.port = port;
}
public void run() {
// Configure the server.
ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool()));
public void run() throws Exception {
ServerBootstrap b = new ServerBootstrap();
try {
b.eventLoop(new NioEventLoop(), new NioEventLoop())
.channel(new NioServerSocketChannel())
.localAddress(port)
.childInitializer(new AutobahnServerInitializer());
// bootstrap.setOption("child.tcpNoDelay", true);
ChannelFuture f = b.bind().sync();
System.out.println("Web Socket Server started at port " + port);
f.channel().closeFuture().sync();
// Set up the event pipeline factory.
bootstrap.setPipelineFactory(new AutobahnServerPipelineFactory());
// Bind and start to accept incoming connections.
bootstrap.bind(new InetSocketAddress(port));
System.out.println("Web Socket Server started at port " + port);
} finally {
b.shutdown();
}
}
public static void main(String[] args) {
public static void main(String[] args) throws Exception {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);

View File

@ -19,14 +19,12 @@ import static io.netty.handler.codec.http.HttpHeaders.*;
import static io.netty.handler.codec.http.HttpMethod.*;
import static io.netty.handler.codec.http.HttpResponseStatus.*;
import static io.netty.handler.codec.http.HttpVersion.*;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ExceptionEvent;
import io.netty.channel.MessageEvent;
import io.netty.channel.SimpleChannelUpstreamHandler;
import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpRequest;
@ -47,14 +45,13 @@ import io.netty.util.CharsetUtil;
/**
* Handles handshakes and messages
*/
public class AutobahnServerHandler extends SimpleChannelUpstreamHandler {
public class AutobahnServerHandler extends ChannelInboundMessageHandlerAdapter<Object> {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(AutobahnServerHandler.class);
private WebSocketServerHandshaker handshaker;
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
Object msg = e.getMessage();
public void messageReceived(ChannelInboundHandlerContext<Object> ctx, Object msg) throws Exception {
if (msg instanceof HttpRequest) {
handleHttpRequest(ctx, (HttpRequest) msg);
} else if (msg instanceof WebSocketFrame) {
@ -71,23 +68,23 @@ public class AutobahnServerHandler extends SimpleChannelUpstreamHandler {
// Handshake
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
this.getWebSocketLocation(req), null, false);
this.handshaker = wsFactory.newHandshaker(req);
if (this.handshaker == null) {
getWebSocketLocation(req), null, false);
handshaker = wsFactory.newHandshaker(req);
if (handshaker == null) {
wsFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
} else {
this.handshaker.handshake(ctx.channel(), req);
handshaker.handshake(ctx.channel(), req);
}
}
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
if (logger.isDebugEnabled()) {
logger.debug(String.format("Channel %s received %s", ctx.channel().getId(), frame.getClass()
logger.debug(String.format("Channel %s received %s", ctx.channel().id(), frame.getClass()
.getSimpleName()));
}
if (frame instanceof CloseWebSocketFrame) {
this.handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame);
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame);
} else if (frame instanceof PingWebSocketFrame) {
ctx.channel().write(
new PongWebSocketFrame(frame.isFinalFragment(), frame.getRsv(), frame.getBinaryData()));
@ -109,7 +106,7 @@ public class AutobahnServerHandler extends SimpleChannelUpstreamHandler {
}
}
private void sendHttpResponse(ChannelHandlerContext ctx, HttpRequest req, HttpResponse res) {
private static void sendHttpResponse(ChannelHandlerContext ctx, HttpRequest req, HttpResponse res) {
// Generate an error page if response status code is not OK (200).
if (res.getStatus().getCode() != 200) {
res.setContent(ChannelBuffers.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8));
@ -124,12 +121,12 @@ public class AutobahnServerHandler extends SimpleChannelUpstreamHandler {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
e.cause().printStackTrace();
e.channel().close();
public void exceptionCaught(ChannelInboundHandlerContext<Object> ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
private String getWebSocketLocation(HttpRequest req) {
private static String getWebSocketLocation(HttpRequest req) {
return "ws://" + req.getHeader(HttpHeaders.Names.HOST);
}
}

View File

@ -15,23 +15,20 @@
*/
package io.netty.example.http.websocketx.autobahn;
import static io.netty.channel.Channels.*;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPipelineFactory;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpChunkAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
public class AutobahnServerPipelineFactory implements ChannelPipelineFactory {
public class AutobahnServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
public ChannelPipeline getPipeline() throws Exception {
// Create a default pipeline implementation.
ChannelPipeline pipeline = pipeline();
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("decoder", new HttpRequestDecoder());
pipeline.addLast("aggregator", new HttpChunkAggregator(65536));
pipeline.addLast("encoder", new HttpResponseEncoder());
pipeline.addLast("handler", new AutobahnServerHandler());
return pipeline;
}
}

View File

@ -36,19 +36,14 @@
//THE SOFTWARE.
package io.netty.example.http.websocketx.client;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.HashMap;
import java.util.concurrent.Executors;
import io.netty.bootstrap.ClientBootstrap;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPipelineFactory;
import io.netty.channel.Channels;
import io.netty.channel.socket.nio.NioClientSocketChannelFactory;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioEventLoop;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.HttpRequestEncoder;
import io.netty.handler.codec.http.HttpResponseDecoder;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
@ -58,6 +53,9 @@ import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import java.net.URI;
import java.util.HashMap;
public class WebSocketClient {
private final URI uri;
@ -67,14 +65,9 @@ public class WebSocketClient {
}
public void run() throws Exception {
ClientBootstrap bootstrap =
new ClientBootstrap(
new NioClientSocketChannelFactory(
Executors.newCachedThreadPool()));
Channel ch = null;
Bootstrap b = new Bootstrap();
try {
String protocol = uri.getScheme();
if (!protocol.equals("ws")) {
throw new IllegalArgumentException("Unsupported protocol: " + protocol);
@ -90,29 +83,26 @@ public class WebSocketClient {
new WebSocketClientHandshakerFactory().newHandshaker(
uri, WebSocketVersion.V13, null, false, customHeaders);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", new HttpResponseDecoder());
pipeline.addLast("encoder", new HttpRequestEncoder());
pipeline.addLast("ws-handler", new WebSocketClientHandler(handshaker));
return pipeline;
}
});
b.eventLoop(new NioEventLoop())
.channel(new NioSocketChannel())
.remoteAddress(uri.getHost(), uri.getPort())
.initializer(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("decoder", new HttpResponseDecoder());
pipeline.addLast("encoder", new HttpRequestEncoder());
pipeline.addLast("ws-handler", new WebSocketClientHandler(handshaker));
}
});
// Connect
System.out.println("WebSocket Client connecting");
ChannelFuture future =
bootstrap.connect(
new InetSocketAddress(uri.getHost(), uri.getPort()));
future.awaitUninterruptibly().sync();
ch = future.channel();
handshaker.handshake(ch).awaitUninterruptibly().sync();
Channel ch = b.connect().sync().channel();
handshaker.handshake(ch).sync();
// Send 10 messages and wait for responses
System.out.println("WebSocket Client sending message");
for (int i = 0; i < 10; i++) {
for (int i = 0; i < 1000; i++) {
ch.write(new TextWebSocketFrame("Message #" + i));
}
@ -126,12 +116,9 @@ public class WebSocketClient {
// WebSocketClientHandler will close the connection when the server
// responds to the CloseWebSocketFrame.
ch.getCloseFuture().awaitUninterruptibly();
ch.closeFuture().sync();
} finally {
if (ch != null) {
ch.close();
}
bootstrap.releaseExternalResources();
b.shutdown();
}
}

View File

@ -38,11 +38,8 @@
package io.netty.example.http.websocketx.client;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelStateEvent;
import io.netty.channel.ExceptionEvent;
import io.netty.channel.MessageEvent;
import io.netty.channel.SimpleChannelUpstreamHandler;
import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
@ -51,7 +48,7 @@ import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.util.CharsetUtil;
public class WebSocketClientHandler extends SimpleChannelUpstreamHandler {
public class WebSocketClientHandler extends ChannelInboundMessageHandlerAdapter<Object> {
private final WebSocketClientHandshaker handshaker;
@ -60,26 +57,26 @@ public class WebSocketClientHandler extends SimpleChannelUpstreamHandler {
}
@Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
public void channelInactive(ChannelInboundHandlerContext<Object> ctx) throws Exception {
System.out.println("WebSocket Client disconnected!");
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
public void messageReceived(ChannelInboundHandlerContext<Object> ctx, Object msg) throws Exception {
Channel ch = ctx.channel();
if (!handshaker.isHandshakeComplete()) {
handshaker.finishHandshake(ch, (HttpResponse) e.getMessage());
handshaker.finishHandshake(ch, (HttpResponse) msg);
System.out.println("WebSocket Client connected!");
return;
}
if (e.getMessage() instanceof HttpResponse) {
HttpResponse response = (HttpResponse) e.getMessage();
if (msg instanceof HttpResponse) {
HttpResponse response = (HttpResponse) msg;
throw new Exception("Unexpected HttpResponse (status=" + response.getStatus() + ", content="
+ response.getContent().toString(CharsetUtil.UTF_8) + ")");
}
WebSocketFrame frame = (WebSocketFrame) e.getMessage();
WebSocketFrame frame = (WebSocketFrame) msg;
if (frame instanceof TextWebSocketFrame) {
TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
System.out.println("WebSocket Client received message: " + textFrame.getText());
@ -92,9 +89,8 @@ public class WebSocketClientHandler extends SimpleChannelUpstreamHandler {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
final Throwable t = e.cause();
t.printStackTrace();
e.channel().close();
public void exceptionCaught(ChannelInboundHandlerContext<Object> ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

View File

@ -16,8 +16,9 @@
/**
* <p>This is an example web service client.
* <p>To run this example, you must first start {@link WebSocketServer} and
* then {@link WebSocketClient}.
* <p>To run this example, you must first start
* {@link io.netty.example.http.websocketx.server.WebSocketServer} and
* then {@link io.netty.example.http.websocketx.client.WebSocketClient}.
*/
package io.netty.example.http.websocketx.client;

View File

@ -15,11 +15,10 @@
*/
package io.netty.example.http.websocketx.server;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.socket.nio.NioServerSocketChannelFactory;
import io.netty.channel.Channel;
import io.netty.channel.socket.nio.NioEventLoop;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* A HTTP server which serves Web Socket requests at:
@ -48,21 +47,25 @@ public class WebSocketServer {
this.port = port;
}
public void run() {
// Configure the server.
ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool()));
public void run() throws Exception {
ServerBootstrap b = new ServerBootstrap();
try {
b.eventLoop(new NioEventLoop(), new NioEventLoop())
.channel(new NioServerSocketChannel())
.localAddress(port)
.childInitializer(new WebSocketServerInitializer());
// Set up the event pipeline factory.
bootstrap.setPipelineFactory(new WebSocketServerPipelineFactory());
Channel ch = b.bind().sync().channel();
System.out.println("Web socket server started at port " + port + '.');
System.out.println("Open your browser and navigate to http://localhost:" + port + '/');
// Bind and start to accept incoming connections.
bootstrap.bind(new InetSocketAddress(port));
System.out.println("Web socket server started at port " + port + '.');
System.out.println("Open your browser and navigate to http://localhost:" + port + '/');
ch.closeFuture().sync();
} finally {
b.shutdown();
}
}
public static void main(String[] args) {
public static void main(String[] args) throws Exception {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);

View File

@ -20,15 +20,13 @@ import static io.netty.handler.codec.http.HttpHeaders.Names.*;
import static io.netty.handler.codec.http.HttpMethod.*;
import static io.netty.handler.codec.http.HttpResponseStatus.*;
import static io.netty.handler.codec.http.HttpVersion.*;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ExceptionEvent;
import io.netty.channel.MessageEvent;
import io.netty.channel.SimpleChannelUpstreamHandler;
import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpRequest;
@ -47,7 +45,7 @@ import io.netty.util.CharsetUtil;
/**
* Handles handshakes and messages
*/
public class WebSocketServerHandler extends SimpleChannelUpstreamHandler {
public class WebSocketServerHandler extends ChannelInboundMessageHandlerAdapter<Object> {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(WebSocketServerHandler.class);
private static final String WEBSOCKET_PATH = "/websocket";
@ -55,8 +53,7 @@ public class WebSocketServerHandler extends SimpleChannelUpstreamHandler {
private WebSocketServerHandshaker handshaker;
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
Object msg = e.getMessage();
public void messageReceived(ChannelInboundHandlerContext<Object> ctx, Object msg) throws Exception {
if (msg instanceof HttpRequest) {
handleHttpRequest(ctx, (HttpRequest) msg);
} else if (msg instanceof WebSocketFrame) {
@ -91,12 +88,12 @@ public class WebSocketServerHandler extends SimpleChannelUpstreamHandler {
// Handshake
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
this.getWebSocketLocation(req), null, false);
this.handshaker = wsFactory.newHandshaker(req);
if (this.handshaker == null) {
getWebSocketLocation(req), null, false);
handshaker = wsFactory.newHandshaker(req);
if (handshaker == null) {
wsFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
} else {
this.handshaker.handshake(ctx.channel(), req);
handshaker.handshake(ctx.channel(), req);
}
}
@ -104,7 +101,7 @@ public class WebSocketServerHandler extends SimpleChannelUpstreamHandler {
// Check for closing frame
if (frame instanceof CloseWebSocketFrame) {
this.handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame);
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame);
return;
} else if (frame instanceof PingWebSocketFrame) {
ctx.channel().write(new PongWebSocketFrame(frame.getBinaryData()));
@ -117,12 +114,12 @@ public class WebSocketServerHandler extends SimpleChannelUpstreamHandler {
// Send the uppercase string back.
String request = ((TextWebSocketFrame) frame).getText();
if (logger.isDebugEnabled()) {
logger.debug(String.format("Channel %s received %s", ctx.channel().getId(), request));
logger.debug(String.format("Channel %s received %s", ctx.channel().id(), request));
}
ctx.channel().write(new TextWebSocketFrame(request.toUpperCase()));
}
private void sendHttpResponse(ChannelHandlerContext ctx, HttpRequest req, HttpResponse res) {
private static void sendHttpResponse(ChannelHandlerContext ctx, HttpRequest req, HttpResponse res) {
// Generate an error page if response status code is not OK (200).
if (res.getStatus().getCode() != 200) {
res.setContent(ChannelBuffers.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8));
@ -137,12 +134,12 @@ public class WebSocketServerHandler extends SimpleChannelUpstreamHandler {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
e.cause().printStackTrace();
e.channel().close();
public void exceptionCaught(ChannelInboundHandlerContext<Object> ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
private String getWebSocketLocation(HttpRequest req) {
private static String getWebSocketLocation(HttpRequest req) {
return "ws://" + req.getHeader(HttpHeaders.Names.HOST) + WEBSOCKET_PATH;
}
}

View File

@ -15,25 +15,22 @@
*/
package io.netty.example.http.websocketx.server;
import static io.netty.channel.Channels.*;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPipelineFactory;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpChunkAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
/**
*/
public class WebSocketServerPipelineFactory implements ChannelPipelineFactory {
public class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
public ChannelPipeline getPipeline() throws Exception {
// Create a default pipeline implementation.
ChannelPipeline pipeline = pipeline();
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("decoder", new HttpRequestDecoder());
pipeline.addLast("aggregator", new HttpChunkAggregator(65536));
pipeline.addLast("encoder", new HttpResponseEncoder());
pipeline.addLast("handler", new WebSocketServerHandler());
return pipeline;
}
}

View File

@ -15,11 +15,10 @@
*/
package io.netty.example.http.websocketx.sslserver;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.socket.nio.NioServerSocketChannelFactory;
import io.netty.channel.Channel;
import io.netty.channel.socket.nio.NioEventLoop;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* A HTTP server which serves Web Socket requests at:
@ -47,21 +46,24 @@ public class WebSocketSslServer {
this.port = port;
}
public void run() {
// Configure the server.
ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool()));
public void run() throws Exception {
ServerBootstrap b = new ServerBootstrap();
try {
b.eventLoop(new NioEventLoop(), new NioEventLoop())
.channel(new NioServerSocketChannel())
.localAddress(port)
.childInitializer(new WebSocketSslServerInitializer());
// Set up the event pipeline factory.
bootstrap.setPipelineFactory(new WebSocketSslServerPipelineFactory());
// Bind and start to accept incoming connections.
bootstrap.bind(new InetSocketAddress(port));
System.out.println("Web socket server started at port " + port + '.');
System.out.println("Open your browser and navigate to https://localhost:" + port + '/');
Channel ch = b.bind().sync().channel();
System.out.println("Web socket server started at port " + port + '.');
System.out.println("Open your browser and navigate to https://localhost:" + port + '/');
ch.closeFuture().sync();
} finally {
b.shutdown();
}
}
public static void main(String[] args) {
public static void main(String[] args) throws Exception {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);

View File

@ -20,15 +20,13 @@ import static io.netty.handler.codec.http.HttpHeaders.Names.*;
import static io.netty.handler.codec.http.HttpMethod.*;
import static io.netty.handler.codec.http.HttpResponseStatus.*;
import static io.netty.handler.codec.http.HttpVersion.*;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ExceptionEvent;
import io.netty.channel.MessageEvent;
import io.netty.channel.SimpleChannelUpstreamHandler;
import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpRequest;
@ -47,7 +45,7 @@ import io.netty.util.CharsetUtil;
/**
* Handles handshakes and messages
*/
public class WebSocketSslServerHandler extends SimpleChannelUpstreamHandler {
public class WebSocketSslServerHandler extends ChannelInboundMessageHandlerAdapter<Object> {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(WebSocketSslServerHandler.class);
private static final String WEBSOCKET_PATH = "/websocket";
@ -55,8 +53,7 @@ public class WebSocketSslServerHandler extends SimpleChannelUpstreamHandler {
private WebSocketServerHandshaker handshaker;
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
Object msg = e.getMessage();
public void messageReceived(ChannelInboundHandlerContext<Object> ctx, Object msg) throws Exception {
if (msg instanceof HttpRequest) {
handleHttpRequest(ctx, (HttpRequest) msg);
} else if (msg instanceof WebSocketFrame) {
@ -91,12 +88,12 @@ public class WebSocketSslServerHandler extends SimpleChannelUpstreamHandler {
// Handshake
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
this.getWebSocketLocation(req), null, false);
this.handshaker = wsFactory.newHandshaker(req);
if (this.handshaker == null) {
getWebSocketLocation(req), null, false);
handshaker = wsFactory.newHandshaker(req);
if (handshaker == null) {
wsFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
} else {
this.handshaker.handshake(ctx.channel(), req);
handshaker.handshake(ctx.channel(), req);
}
}
@ -104,7 +101,7 @@ public class WebSocketSslServerHandler extends SimpleChannelUpstreamHandler {
// Check for closing frame
if (frame instanceof CloseWebSocketFrame) {
this.handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame);
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame);
return;
} else if (frame instanceof PingWebSocketFrame) {
ctx.channel().write(new PongWebSocketFrame(frame.getBinaryData()));
@ -117,12 +114,12 @@ public class WebSocketSslServerHandler extends SimpleChannelUpstreamHandler {
// Send the uppercase string back.
String request = ((TextWebSocketFrame) frame).getText();
if (logger.isDebugEnabled()) {
logger.debug(String.format("Channel %s received %s", ctx.channel().getId(), request));
logger.debug(String.format("Channel %s received %s", ctx.channel().id(), request));
}
ctx.channel().write(new TextWebSocketFrame(request.toUpperCase()));
}
private void sendHttpResponse(ChannelHandlerContext ctx, HttpRequest req, HttpResponse res) {
private static void sendHttpResponse(ChannelHandlerContext ctx, HttpRequest req, HttpResponse res) {
// Generate an error page if response status code is not OK (200).
if (res.getStatus().getCode() != 200) {
res.setContent(ChannelBuffers.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8));
@ -137,12 +134,12 @@ public class WebSocketSslServerHandler extends SimpleChannelUpstreamHandler {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
e.cause().printStackTrace();
e.channel().close();
public void exceptionCaught(ChannelInboundHandlerContext<Object> ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
private String getWebSocketLocation(HttpRequest req) {
private static String getWebSocketLocation(HttpRequest req) {
return "wss://" + req.getHeader(HttpHeaders.Names.HOST) + WEBSOCKET_PATH;
}
}

View File

@ -15,24 +15,22 @@
*/
package io.netty.example.http.websocketx.sslserver;
import static io.netty.channel.Channels.*;
import javax.net.ssl.SSLEngine;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPipelineFactory;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpChunkAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.ssl.SslHandler;
import javax.net.ssl.SSLEngine;
/**
*/
public class WebSocketSslServerPipelineFactory implements ChannelPipelineFactory {
public class WebSocketSslServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
public ChannelPipeline getPipeline() throws Exception {
// Create a default pipeline implementation.
ChannelPipeline pipeline = pipeline();
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
SSLEngine engine = WebSocketSslServerSslContext.getInstance().getServerContext().createSSLEngine();
engine.setUseClientMode(false);
@ -42,6 +40,5 @@ public class WebSocketSslServerPipelineFactory implements ChannelPipelineFactory
pipeline.addLast("aggregator", new HttpChunkAggregator(65536));
pipeline.addLast("encoder", new HttpResponseEncoder());
pipeline.addLast("handler", new WebSocketSslServerHandler());
return pipeline;
}
}

View File

@ -15,17 +15,15 @@
*/
package io.netty.example.localtime;
import java.net.InetSocketAddress;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.socket.nio.NioEventLoop;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Executors;
import io.netty.bootstrap.ClientBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.socket.nio.NioClientSocketChannelFactory;
/**
* Sends a list of continent/city pairs to a {@link LocalTimeServer} to
@ -44,39 +42,35 @@ public class LocalTimeClient {
this.cities.addAll(cities);
}
public void run() {
// Set up.
ClientBootstrap bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(
Executors.newCachedThreadPool()));
public void run() throws Exception {
Bootstrap b = new Bootstrap();
try {
b.eventLoop(new NioEventLoop())
.channel(new NioSocketChannel())
.remoteAddress(host, port)
.initializer(new LocalTimeClientInitializer());
// Configure the event pipeline factory.
bootstrap.setPipelineFactory(new LocalTimeClientPipelineFactory());
// Make a new connection.
Channel ch = b.connect().sync().channel();
// Make a new connection.
ChannelFuture connectFuture =
bootstrap.connect(new InetSocketAddress(host, port));
// Get the handler instance to initiate the request.
LocalTimeClientHandler handler =
ch.pipeline().get(LocalTimeClientHandler.class);
// Wait until the connection is made successfully.
Channel channel = connectFuture.awaitUninterruptibly().channel();
// Request and get the response.
List<String> response = handler.getLocalTimes(cities);
// Get the handler instance to initiate the request.
LocalTimeClientHandler handler =
channel.pipeline().get(LocalTimeClientHandler.class);
// Close the connection.
ch.close();
// Request and get the response.
List<String> response = handler.getLocalTimes(cities);
// Close the connection.
channel.close().awaitUninterruptibly();
// Shut down all thread pools to exit.
bootstrap.releaseExternalResources();
// Print the response at last but not least.
Iterator<String> i1 = cities.iterator();
Iterator<String> i2 = response.iterator();
while (i1.hasNext()) {
System.out.format("%28s: %s%n", i1.next(), i2.next());
// Print the response at last but not least.
Iterator<String> i1 = cities.iterator();
Iterator<String> i2 = response.iterator();
while (i1.hasNext()) {
System.out.format("%28s: %s%n", i1.next(), i2.next());
}
} finally {
b.shutdown();
}
}

View File

@ -15,6 +15,15 @@
*/
package io.netty.example.localtime;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.example.localtime.LocalTimeProtocol.Continent;
import io.netty.example.localtime.LocalTimeProtocol.LocalTime;
import io.netty.example.localtime.LocalTimeProtocol.LocalTimes;
import io.netty.example.localtime.LocalTimeProtocol.Location;
import io.netty.example.localtime.LocalTimeProtocol.Locations;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Formatter;
@ -24,20 +33,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import io.netty.channel.Channel;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelStateEvent;
import io.netty.channel.ExceptionEvent;
import io.netty.channel.MessageEvent;
import io.netty.channel.SimpleChannelUpstreamHandler;
import io.netty.example.localtime.LocalTimeProtocol.Continent;
import io.netty.example.localtime.LocalTimeProtocol.LocalTime;
import io.netty.example.localtime.LocalTimeProtocol.LocalTimes;
import io.netty.example.localtime.LocalTimeProtocol.Location;
import io.netty.example.localtime.LocalTimeProtocol.Locations;
public class LocalTimeClientHandler extends SimpleChannelUpstreamHandler {
public class LocalTimeClientHandler extends ChannelInboundMessageHandlerAdapter<LocalTimes> {
private static final Logger logger = Logger.getLogger(
LocalTimeClientHandler.class.getName());
@ -91,35 +87,20 @@ public class LocalTimeClientHandler extends SimpleChannelUpstreamHandler {
}
@Override
public void handleUpstream(
ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
if (e instanceof ChannelStateEvent) {
logger.info(e.toString());
}
super.handleUpstream(ctx, e);
public void channelRegistered(ChannelInboundHandlerContext<LocalTimes> ctx) throws Exception {
channel = ctx.channel();
}
@Override
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
channel = e.channel();
super.channelOpen(ctx, e);
public void messageReceived(ChannelInboundHandlerContext<LocalTimes> ctx, LocalTimes msg) throws Exception {
answer.add(msg);
}
@Override
public void messageReceived(
ChannelHandlerContext ctx, final MessageEvent e) {
boolean offered = answer.offer((LocalTimes) e.getMessage());
assert offered;
}
@Override
public void exceptionCaught(
ChannelHandlerContext ctx, ExceptionEvent e) {
public void exceptionCaught(ChannelInboundHandlerContext<LocalTimes> ctx, Throwable cause) throws Exception {
logger.log(
Level.WARNING,
"Unexpected exception from downstream.",
e.cause());
e.channel().close();
"Unexpected exception from downstream.", cause);
ctx.close();
}
}

View File

@ -15,20 +15,19 @@
*/
package io.netty.example.localtime;
import static io.netty.channel.Channels.*;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPipelineFactory;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
public class LocalTimeClientPipelineFactory implements ChannelPipelineFactory {
public class LocalTimeClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline p = pipeline();
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast("frameDecoder", new ProtobufVarint32FrameDecoder());
p.addLast("protobufDecoder", new ProtobufDecoder(LocalTimeProtocol.LocalTimes.getDefaultInstance()));
@ -36,6 +35,5 @@ public class LocalTimeClientPipelineFactory implements ChannelPipelineFactory {
p.addLast("protobufEncoder", new ProtobufEncoder());
p.addLast("handler", new LocalTimeClientHandler());
return p;
}
}

View File

@ -15,11 +15,9 @@
*/
package io.netty.example.localtime;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.socket.nio.NioServerSocketChannelFactory;
import io.netty.channel.socket.nio.NioEventLoop;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* Receives a list of continent/city pairs from a {@link LocalTimeClient} to
@ -33,17 +31,19 @@ public class LocalTimeServer {
this.port = port;
}
public void run() {
// Configure the server.
ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory(
Executors.newCachedThreadPool()));
public void run() throws Exception {
ServerBootstrap b = new ServerBootstrap();
try {
b.eventLoop(new NioEventLoop(), new NioEventLoop())
.channel(new NioServerSocketChannel())
.localAddress(port)
.childInitializer(new LocalTimeServerInitializer());
// Set up the event pipeline factory.
bootstrap.setPipelineFactory(new LocalTimeServerPipelineFactory());
b.bind().sync().channel().closeFuture().sync();
// Bind and start to accept incoming connections.
bootstrap.bind(new InetSocketAddress(port));
} finally {
b.shutdown();
}
}
public static void main(String[] args) throws Exception {

View File

@ -16,18 +16,8 @@
package io.netty.example.localtime;
import static java.util.Calendar.*;
import java.util.Calendar;
import java.util.TimeZone;
import java.util.logging.Level;
import java.util.logging.Logger;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelStateEvent;
import io.netty.channel.ExceptionEvent;
import io.netty.channel.MessageEvent;
import io.netty.channel.SimpleChannelUpstreamHandler;
import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.example.localtime.LocalTimeProtocol.Continent;
import io.netty.example.localtime.LocalTimeProtocol.DayOfWeek;
import io.netty.example.localtime.LocalTimeProtocol.LocalTime;
@ -35,25 +25,18 @@ import io.netty.example.localtime.LocalTimeProtocol.LocalTimes;
import io.netty.example.localtime.LocalTimeProtocol.Location;
import io.netty.example.localtime.LocalTimeProtocol.Locations;
public class LocalTimeServerHandler extends SimpleChannelUpstreamHandler {
import java.util.Calendar;
import java.util.TimeZone;
import java.util.logging.Level;
import java.util.logging.Logger;
public class LocalTimeServerHandler extends ChannelInboundMessageHandlerAdapter<Locations> {
private static final Logger logger = Logger.getLogger(
LocalTimeServerHandler.class.getName());
@Override
public void handleUpstream(
ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
if (e instanceof ChannelStateEvent) {
logger.info(e.toString());
}
super.handleUpstream(ctx, e);
}
@Override
public void messageReceived(
ChannelHandlerContext ctx, MessageEvent e) {
Locations locations = (Locations) e.getMessage();
public void messageReceived(ChannelInboundHandlerContext<Locations> ctx, Locations locations) throws Exception {
long currentTime = System.currentTimeMillis();
LocalTimes.Builder builder = LocalTimes.newBuilder();
@ -73,17 +56,15 @@ public class LocalTimeServerHandler extends SimpleChannelUpstreamHandler {
setSecond(calendar.get(SECOND)).build());
}
e.channel().write(builder.build());
ctx.write(builder.build());
}
@Override
public void exceptionCaught(
ChannelHandlerContext ctx, ExceptionEvent e) {
public void exceptionCaught(ChannelInboundHandlerContext<Locations> ctx, Throwable cause) throws Exception {
logger.log(
Level.WARNING,
"Unexpected exception from downstream.",
e.cause());
e.channel().close();
"Unexpected exception from downstream.", cause);
ctx.close();
}
private static String toString(Continent c) {

View File

@ -15,20 +15,18 @@
*/
package io.netty.example.localtime;
import static io.netty.channel.Channels.*;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPipelineFactory;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
public class LocalTimeServerPipelineFactory implements ChannelPipelineFactory {
public class LocalTimeServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline p = pipeline();
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast("frameDecoder", new ProtobufVarint32FrameDecoder());
p.addLast("protobufDecoder", new ProtobufDecoder(LocalTimeProtocol.Locations.getDefaultInstance()));
@ -36,6 +34,5 @@ public class LocalTimeServerPipelineFactory implements ChannelPipelineFactory {
p.addLast("protobufEncoder", new ProtobufEncoder());
p.addLast("handler", new LocalTimeServerHandler());
return p;
}
}

View File

@ -15,19 +15,16 @@
*/
package io.netty.example.objectecho;
import io.netty.bootstrap.ClientBootstrap;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPipelineFactory;
import io.netty.channel.Channels;
import io.netty.channel.socket.nio.NioClientSocketChannelFactory;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioEventLoop;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.example.echo.EchoClient;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
/**
* Modification of {@link EchoClient} which utilizes Java object serialization.
*/
@ -43,26 +40,27 @@ public class ObjectEchoClient {
this.firstMessageSize = firstMessageSize;
}
public void run() {
// Configure the client.
ClientBootstrap bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(
Executors.newCachedThreadPool()));
public void run() throws Exception {
Bootstrap b = new Bootstrap();
try {
b.eventLoop(new NioEventLoop())
.channel(new NioSocketChannel())
.remoteAddress(host, port)
.initializer(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new ObjectEncoder(),
new ObjectDecoder(ClassResolvers.cacheDisabled(null)),
new ObjectEchoClientHandler(firstMessageSize));
}
});
// Set up the pipeline factory.
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(
new ObjectEncoder(),
new ObjectDecoder(
ClassResolvers.cacheDisabled(getClass().getClassLoader())),
new ObjectEchoClientHandler(firstMessageSize));
}
});
// Start the connection attempt.
bootstrap.connect(new InetSocketAddress(host, port));
// Start the connection attempt.
b.connect().sync().channel().closeFuture().sync();
} finally {
b.shutdown();
}
}
public static void main(String[] args) throws Exception {

View File

@ -15,32 +15,25 @@
*/
package io.netty.example.objectecho;
import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelState;
import io.netty.channel.ChannelStateEvent;
import io.netty.channel.ExceptionEvent;
import io.netty.channel.MessageEvent;
import io.netty.channel.SimpleChannelUpstreamHandler;
/**
* Handler implementation for the object echo client. It initiates the
* ping-pong traffic between the object echo client and server by sending the
* first message to the server.
*/
public class ObjectEchoClientHandler extends SimpleChannelUpstreamHandler {
public class ObjectEchoClientHandler extends ChannelInboundMessageHandlerAdapter<List<Integer>> {
private static final Logger logger = Logger.getLogger(
ObjectEchoClientHandler.class.getName());
private final List<Integer> firstMessage;
private final AtomicLong transferredMessages = new AtomicLong();
/**
* Creates a client-side handler.
@ -56,42 +49,24 @@ public class ObjectEchoClientHandler extends SimpleChannelUpstreamHandler {
}
}
public long getTransferredMessages() {
return transferredMessages.get();
}
@Override
public void handleUpstream(
ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
if (e instanceof ChannelStateEvent &&
((ChannelStateEvent) e).getState() != ChannelState.INTEREST_OPS) {
logger.info(e.toString());
}
super.handleUpstream(ctx, e);
}
@Override
public void channelConnected(
ChannelHandlerContext ctx, ChannelStateEvent e) {
public void channelActive(ChannelInboundHandlerContext<List<Integer>> ctx) throws Exception {
// Send the first message if this handler is a client-side handler.
e.channel().write(firstMessage);
ctx.write(firstMessage);
}
@Override
public void messageReceived(
ChannelHandlerContext ctx, MessageEvent e) {
public void messageReceived( ChannelInboundHandlerContext<List<Integer>> ctx, List<Integer> msg) throws Exception {
// Echo back the received object to the client.
transferredMessages.incrementAndGet();
e.channel().write(e.getMessage());
ctx.write(msg);
}
@Override
public void exceptionCaught(
ChannelHandlerContext ctx, ExceptionEvent e) {
ChannelInboundHandlerContext<List<Integer>> ctx, Throwable cause) throws Exception {
logger.log(
Level.WARNING,
"Unexpected exception from downstream.",
e.cause());
e.channel().close();
"Unexpected exception from downstream.", cause);
ctx.close();
}
}

View File

@ -16,18 +16,15 @@
package io.netty.example.objectecho;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPipelineFactory;
import io.netty.channel.Channels;
import io.netty.channel.socket.nio.NioServerSocketChannelFactory;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioEventLoop;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.example.echo.EchoServer;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
/**
* Modification of {@link EchoServer} which utilizes Java object serialization.
*/
@ -39,26 +36,27 @@ public class ObjectEchoServer {
this.port = port;
}
public void run() {
// Configure the server.
ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory(
Executors.newCachedThreadPool()));
public void run() throws Exception {
ServerBootstrap b = new ServerBootstrap();
try {
b.eventLoop(new NioEventLoop(), new NioEventLoop())
.channel(new NioServerSocketChannel())
.localAddress(port)
.childInitializer(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new ObjectEncoder(),
new ObjectDecoder(ClassResolvers.cacheDisabled(null)),
new ObjectEchoServerHandler());
}
});
// Set up the pipeline factory.
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(
new ObjectEncoder(),
new ObjectDecoder(
ClassResolvers.cacheDisabled(getClass().getClassLoader())),
new ObjectEchoServerHandler());
}
});
// Bind and start to accept incoming connections.
bootstrap.bind(new InetSocketAddress(port));
// Bind and start to accept incoming connections.
b.bind().sync().channel().closeFuture().sync();
} finally {
b.shutdown();
}
}
public static void main(String[] args) throws Exception {

View File

@ -15,58 +15,35 @@
*/
package io.netty.example.objectecho;
import java.util.concurrent.atomic.AtomicLong;
import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelState;
import io.netty.channel.ChannelStateEvent;
import io.netty.channel.ExceptionEvent;
import io.netty.channel.MessageEvent;
import io.netty.channel.SimpleChannelUpstreamHandler;
/**
* Handles both client-side and server-side handler depending on which
* constructor was called.
*/
public class ObjectEchoServerHandler extends SimpleChannelUpstreamHandler {
public class ObjectEchoServerHandler extends ChannelInboundMessageHandlerAdapter<List<Integer>> {
private static final Logger logger = Logger.getLogger(
ObjectEchoServerHandler.class.getName());
private final AtomicLong transferredMessages = new AtomicLong();
public long getTransferredMessages() {
return transferredMessages.get();
}
@Override
public void handleUpstream(
ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
if (e instanceof ChannelStateEvent &&
((ChannelStateEvent) e).getState() != ChannelState.INTEREST_OPS) {
logger.info(e.toString());
}
super.handleUpstream(ctx, e);
}
@Override
public void messageReceived(
ChannelHandlerContext ctx, MessageEvent e) {
ChannelInboundHandlerContext<List<Integer>> ctx, List<Integer> msg) throws Exception {
// Echo back the received object to the client.
transferredMessages.incrementAndGet();
e.channel().write(e.getMessage());
ctx.write(msg);
}
@Override
public void exceptionCaught(
ChannelHandlerContext ctx, ExceptionEvent e) {
ChannelInboundHandlerContext<List<Integer>> ctx, Throwable cause) throws Exception {
logger.log(
Level.WARNING,
"Unexpected exception from downstream.",
e.cause());
e.channel().close();
"Unexpected exception from downstream.", cause);
ctx.close();
}
}

View File

@ -15,14 +15,11 @@
*/
package io.netty.example.portunification;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPipelineFactory;
import io.netty.channel.Channels;
import io.netty.channel.socket.nio.NioServerSocketChannelFactory;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioEventLoop;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* Serves two protocols (HTTP and Factorial) using only one port, enabling
@ -39,21 +36,24 @@ public class PortUnificationServer {
this.port = port;
}
public void run() {
// Configure the server.
ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory(
Executors.newCachedThreadPool()));
public void run() throws Exception {
ServerBootstrap b = new ServerBootstrap();
try {
b.eventLoop(new NioEventLoop(), new NioEventLoop())
.channel(new NioServerSocketChannel())
.localAddress(port)
.childInitializer(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new PortUnificationServerHandler());
}
});
// Set up the event pipeline factory.
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(new PortUnificationServerHandler());
}
});
// Bind and start to accept incoming connections.
bootstrap.bind(new InetSocketAddress(port));
// Bind and start to accept incoming connections.
b.bind().sync().channel().closeFuture().sync();
} finally {
b.shutdown();
}
}
public static void main(String[] args) throws Exception {

View File

@ -15,18 +15,16 @@
*/
package io.netty.example.portunification;
import javax.net.ssl.SSLEngine;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.channel.ChannelInboundStreamHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.example.factorial.BigIntegerDecoder;
import io.netty.example.factorial.FactorialServerHandler;
import io.netty.example.factorial.NumberEncoder;
import io.netty.example.http.snoop.HttpSnoopServerHandler;
import io.netty.example.securechat.SecureChatSslContextFactory;
import io.netty.handler.codec.FrameDecoder;
import io.netty.handler.codec.compression.ZlibDecoder;
import io.netty.handler.codec.compression.ZlibEncoder;
import io.netty.handler.codec.compression.ZlibWrapper;
@ -35,11 +33,13 @@ import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.ssl.SslHandler;
import javax.net.ssl.SSLEngine;
/**
* Manipulates the current pipeline dynamically to switch protocols or enable
* SSL or GZIP.
*/
public class PortUnificationServerHandler extends FrameDecoder {
public class PortUnificationServerHandler extends ChannelInboundStreamHandlerAdapter {
private final boolean detectSsl;
private final boolean detectGzip;
@ -54,11 +54,12 @@ public class PortUnificationServerHandler extends FrameDecoder {
}
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
public void inboundBufferUpdated(ChannelInboundHandlerContext<Byte> ctx) throws Exception {
ChannelBuffer buffer = ctx.inbound().byteBuffer();
// Will use the first two bytes to detect a protocol.
if (buffer.readableBytes() < 2) {
return null;
return;
}
final int magic1 = buffer.getUnsignedByte(buffer.readerIndex());
@ -74,13 +75,14 @@ public class PortUnificationServerHandler extends FrameDecoder {
switchToFactorial(ctx);
} else {
// Unknown protocol; discard everything and close the connection.
buffer.skipBytes(buffer.readableBytes());
ctx.channel().close();
return null;
buffer.clear();
ctx.close();
return;
}
// Forward the current read buffer as is to the new handlers.
return buffer.readBytes(buffer.readableBytes());
ctx.nextInboundByteBuffer().writeBytes(buffer);
ctx.fireInboundBufferUpdated();
}
private boolean isSsl(int magic1) {
@ -102,7 +104,7 @@ public class PortUnificationServerHandler extends FrameDecoder {
return false;
}
private boolean isHttp(int magic1, int magic2) {
private static boolean isHttp(int magic1, int magic2) {
return
magic1 == 'G' && magic2 == 'E' || // GET
magic1 == 'P' && magic2 == 'O' || // POST
@ -115,7 +117,7 @@ public class PortUnificationServerHandler extends FrameDecoder {
magic1 == 'C' && magic2 == 'O'; // CONNECT
}
private boolean isFactorial(int magic1) {
private static boolean isFactorial(int magic1) {
return magic1 == 'F';
}

View File

@ -15,14 +15,9 @@
*/
package io.netty.example.proxy;
import java.net.InetSocketAddress;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.socket.ClientSocketChannelFactory;
import io.netty.channel.socket.nio.NioClientSocketChannelFactory;
import io.netty.channel.socket.nio.NioServerSocketChannelFactory;
import io.netty.channel.socket.nio.NioEventLoop;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class HexDumpProxy {
@ -36,25 +31,23 @@ public class HexDumpProxy {
this.remotePort = remotePort;
}
public void run() {
public void run() throws Exception {
System.err.println(
"Proxying *:" + localPort + " to " +
remoteHost + ':' + remotePort + " ...");
// Configure the bootstrap.
Executor executor = Executors.newCachedThreadPool();
ServerBootstrap sb = new ServerBootstrap(
new NioServerSocketChannelFactory(executor));
ServerBootstrap b = new ServerBootstrap();
try {
b.eventLoop(new NioEventLoop(), new NioEventLoop())
.channel(new NioServerSocketChannel())
.localAddress(localPort)
.childInitializer(new HexDumpProxyInitializer(remoteHost, remotePort));
// Set up the event pipeline factory.
ClientSocketChannelFactory cf =
new NioClientSocketChannelFactory(executor);
sb.setPipelineFactory(
new HexDumpProxyPipelineFactory(cf, remoteHost, remotePort));
// Start up the server.
sb.bind(new InetSocketAddress(localPort));
b.bind().sync().channel().closeFuture().sync();
} finally {
b.shutdown();
}
}
public static void main(String[] args) throws Exception {

View File

@ -0,0 +1,41 @@
package io.netty.example.proxy;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.channel.ChannelInboundStreamHandlerAdapter;
public class HexDumpProxyBackendHandler extends ChannelInboundStreamHandlerAdapter {
private final Channel inboundChannel;
public HexDumpProxyBackendHandler(Channel inboundChannel) {
this.inboundChannel = inboundChannel;
}
@Override
public void channelActive(ChannelInboundHandlerContext<Byte> ctx) throws Exception {
ctx.flush();
}
@Override
public void inboundBufferUpdated(ChannelInboundHandlerContext<Byte> ctx) throws Exception {
ChannelBuffer in = ctx.inbound().byteBuffer();
ChannelBuffer out = inboundChannel.outbound().byteBuffer();
out.discardReadBytes();
out.writeBytes(in);
in.clear();
inboundChannel.flush();
}
@Override
public void channelInactive(ChannelInboundHandlerContext<Byte> ctx) throws Exception {
HexDumpProxyFrontendHandler.closeOnFlush(inboundChannel);
}
@Override
public void exceptionCaught(ChannelInboundHandlerContext<Byte> ctx, Throwable cause) throws Exception {
cause.printStackTrace();
HexDumpProxyFrontendHandler.closeOnFlush(ctx.channel());
}
}

View File

@ -0,0 +1,110 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.example.proxy;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.channel.ChannelInboundStreamHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class HexDumpProxyFrontendHandler extends ChannelInboundStreamHandlerAdapter {
private final String remoteHost;
private final int remotePort;
private volatile Channel outboundChannel;
public HexDumpProxyFrontendHandler(String remoteHost, int remotePort) {
this.remoteHost = remoteHost;
this.remotePort = remotePort;
}
@Override
public void channelActive(ChannelInboundHandlerContext<Byte> ctx) throws Exception {
// TODO: Suspend incoming traffic until connected to the remote host.
// Currently, we just keep the inbound traffic in the client channel's outbound buffer.
final Channel inboundChannel = ctx.channel();
// Start the connection attempt.
Bootstrap b = new Bootstrap();
b.eventLoop(ctx.channel().eventLoop())
.channel(new NioSocketChannel())
.remoteAddress(remoteHost, remotePort)
.initializer(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new HexDumpProxyBackendHandler(inboundChannel));
}
});
ChannelFuture f = b.connect();
outboundChannel = f.channel();
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
// Connection attempt succeeded:
// TODO: Begin to accept incoming traffic.
} else {
// Close the connection if the connection attempt has failed.
inboundChannel.close();
}
}
});
}
@Override
public void inboundBufferUpdated(ChannelInboundHandlerContext<Byte> ctx) throws Exception {
ChannelBuffer in = ctx.inbound().byteBuffer();
ChannelBuffer out = outboundChannel.outbound().byteBuffer();
out.discardReadBytes();
out.writeBytes(in);
in.clear();
if (outboundChannel.isActive()) {
outboundChannel.flush();
}
}
@Override
public void channelInactive(ChannelInboundHandlerContext<Byte> ctx) throws Exception {
if (outboundChannel != null) {
closeOnFlush(outboundChannel);
}
}
@Override
public void exceptionCaught(ChannelInboundHandlerContext<Byte> ctx, Throwable cause) throws Exception {
cause.printStackTrace();
closeOnFlush(ctx.channel());
}
/**
* Closes the specified channel after all queued write requests are flushed.
*/
static void closeOnFlush(Channel ch) {
if (ch.isActive()) {
ch.flush().addListener(ChannelFutureListener.CLOSE);
}
}
}

View File

@ -1,180 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.example.proxy;
import java.net.InetSocketAddress;
import io.netty.bootstrap.ClientBootstrap;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelStateEvent;
import io.netty.channel.ExceptionEvent;
import io.netty.channel.MessageEvent;
import io.netty.channel.SimpleChannelUpstreamHandler;
import io.netty.channel.socket.ClientSocketChannelFactory;
public class HexDumpProxyInboundHandler extends SimpleChannelUpstreamHandler {
private final ClientSocketChannelFactory cf;
private final String remoteHost;
private final int remotePort;
// This lock guards against the race condition that overrides the
// OP_READ flag incorrectly.
// See the related discussion: http://markmail.org/message/x7jc6mqx6ripynqf
final Object trafficLock = new Object();
private volatile Channel outboundChannel;
public HexDumpProxyInboundHandler(
ClientSocketChannelFactory cf, String remoteHost, int remotePort) {
this.cf = cf;
this.remoteHost = remoteHost;
this.remotePort = remotePort;
}
@Override
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
// Suspend incoming traffic until connected to the remote host.
final Channel inboundChannel = e.channel();
inboundChannel.setReadable(false);
// Start the connection attempt.
ClientBootstrap cb = new ClientBootstrap(cf);
cb.pipeline().addLast("handler", new OutboundHandler(e.channel()));
ChannelFuture f = cb.connect(new InetSocketAddress(remoteHost, remotePort));
outboundChannel = f.channel();
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
// Connection attempt succeeded:
// Begin to accept incoming traffic.
inboundChannel.setReadable(true);
} else {
// Close the connection if the connection attempt has failed.
inboundChannel.close();
}
}
});
}
@Override
public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e)
throws Exception {
ChannelBuffer msg = (ChannelBuffer) e.getMessage();
//System.out.println(">>> " + ChannelBuffers.hexDump(msg));
synchronized (trafficLock) {
outboundChannel.write(msg);
// If outboundChannel is saturated, do not read until notified in
// OutboundHandler.channelInterestChanged().
if (!outboundChannel.isWritable()) {
e.channel().setReadable(false);
}
}
}
@Override
public void channelInterestChanged(ChannelHandlerContext ctx,
ChannelStateEvent e) throws Exception {
// If inboundChannel is not saturated anymore, continue accepting
// the incoming traffic from the outboundChannel.
synchronized (trafficLock) {
if (e.channel().isWritable()) {
outboundChannel.setReadable(true);
}
}
}
@Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
if (outboundChannel != null) {
closeOnFlush(outboundChannel);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
throws Exception {
e.cause().printStackTrace();
closeOnFlush(e.channel());
}
private class OutboundHandler extends SimpleChannelUpstreamHandler {
private final Channel inboundChannel;
OutboundHandler(Channel inboundChannel) {
this.inboundChannel = inboundChannel;
}
@Override
public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e)
throws Exception {
ChannelBuffer msg = (ChannelBuffer) e.getMessage();
//System.out.println("<<< " + ChannelBuffers.hexDump(msg));
synchronized (trafficLock) {
inboundChannel.write(msg);
// If inboundChannel is saturated, do not read until notified in
// HexDumpProxyInboundHandler.channelInterestChanged().
if (!inboundChannel.isWritable()) {
e.channel().setReadable(false);
}
}
}
@Override
public void channelInterestChanged(ChannelHandlerContext ctx,
ChannelStateEvent e) throws Exception {
// If outboundChannel is not saturated anymore, continue accepting
// the incoming traffic from the inboundChannel.
synchronized (trafficLock) {
if (e.channel().isWritable()) {
inboundChannel.setReadable(true);
}
}
}
@Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
closeOnFlush(inboundChannel);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
throws Exception {
e.cause().printStackTrace();
closeOnFlush(e.channel());
}
}
/**
* Closes the specified channel after all queued write requests are flushed.
*/
static void closeOnFlush(Channel ch) {
if (ch.isConnected()) {
ch.write(ChannelBuffers.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
}
}

View File

@ -15,29 +15,25 @@
*/
package io.netty.example.proxy;
import static io.netty.channel.Channels.*;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPipelineFactory;
import io.netty.channel.socket.ClientSocketChannelFactory;
public class HexDumpProxyInitializer extends ChannelInitializer<SocketChannel> {
public class HexDumpProxyPipelineFactory implements ChannelPipelineFactory {
private final ClientSocketChannelFactory cf;
private final String remoteHost;
private final int remotePort;
public HexDumpProxyPipelineFactory(
ClientSocketChannelFactory cf, String remoteHost, int remotePort) {
this.cf = cf;
public HexDumpProxyInitializer(String remoteHost, int remotePort) {
this.remoteHost = remoteHost;
this.remotePort = remotePort;
}
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline p = pipeline(); // Note the static import.
p.addLast("handler", new HexDumpProxyInboundHandler(cf, remoteHost, remotePort));
return p;
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new LoggingHandler(LogLevel.INFO),
new HexDumpProxyFrontendHandler(remoteHost, remotePort));
}
}

View File

@ -15,9 +15,9 @@
*/
package io.netty.example.qotm;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.Channel;
import io.netty.channel.ChannelBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.socket.DatagramChannel;
@ -43,7 +43,7 @@ public class QuoteOfTheMomentClient {
}
public void run() throws Exception {
ChannelBootstrap b = new ChannelBootstrap();
Bootstrap b = new Bootstrap();
try {
b.eventLoop(new NioEventLoop())
.channel(new NioDatagramChannel())

View File

@ -15,7 +15,7 @@
*/
package io.netty.example.qotm;
import io.netty.channel.ChannelBootstrap;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.socket.DatagramChannel;
@ -39,7 +39,7 @@ public class QuoteOfTheMomentServer {
}
public void run() throws Exception {
ChannelBootstrap b = new ChannelBootstrap();
Bootstrap b = new Bootstrap();
try {
b.eventLoop(new NioEventLoop())
.channel(new NioDatagramChannel())

View File

@ -15,18 +15,16 @@
*/
package io.netty.example.securechat;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import io.netty.bootstrap.ClientBootstrap;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.socket.nio.NioClientSocketChannelFactory;
import io.netty.channel.socket.nio.NioEventLoop;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.example.telnet.TelnetClient;
import java.io.BufferedReader;
import java.io.InputStreamReader;
/**
* Simple SSL chat client modified from {@link TelnetClient}.
*/
@ -40,57 +38,45 @@ public class SecureChatClient {
this.port = port;
}
public void run() throws IOException {
// Configure the client.
ClientBootstrap bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(
Executors.newCachedThreadPool()));
public void run() throws Exception {
Bootstrap b = new Bootstrap();
try {
b.eventLoop(new NioEventLoop())
.channel(new NioSocketChannel())
.remoteAddress(host, port)
.initializer(new SecureChatClientInitializer());
// Configure the pipeline factory.
bootstrap.setPipelineFactory(new SecureChatClientPipelineFactory());
// Start the connection attempt.
Channel ch = b.connect().sync().channel();
// Start the connection attempt.
ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
// Read commands from the stdin.
ChannelFuture lastWriteFuture = null;
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
for (;;) {
String line = in.readLine();
if (line == null) {
break;
}
// Wait until the connection attempt succeeds or fails.
Channel channel = future.awaitUninterruptibly().channel();
if (!future.isSuccess()) {
future.cause().printStackTrace();
bootstrap.releaseExternalResources();
return;
}
// Sends the received line to the server.
lastWriteFuture = ch.write(line + "\r\n");
// Read commands from the stdin.
ChannelFuture lastWriteFuture = null;
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
for (;;) {
String line = in.readLine();
if (line == null) {
break;
// If user typed the 'bye' command, wait until the server closes
// the connection.
if (line.toLowerCase().equals("bye")) {
ch.closeFuture().sync();
break;
}
}
// Sends the received line to the server.
lastWriteFuture = channel.write(line + "\r\n");
// If user typed the 'bye' command, wait until the server closes
// the connection.
if (line.toLowerCase().equals("bye")) {
channel.getCloseFuture().awaitUninterruptibly();
break;
// Wait until all messages are flushed before closing the channel.
if (lastWriteFuture != null) {
lastWriteFuture.sync();
}
} finally {
// The connection is closed automatically on shutdown.
b.shutdown();
}
// Wait until all messages are flushed before closing the channel.
if (lastWriteFuture != null) {
lastWriteFuture.awaitUninterruptibly();
}
// Close the connection. Make sure the close operation ends because
// all I/O operations are asynchronous in Netty.
channel.close().awaitUninterruptibly();
// Shut down all thread pools to exit.
bootstrap.releaseExternalResources();
}
public static void main(String[] args) throws Exception {

View File

@ -15,37 +15,23 @@
*/
package io.netty.example.securechat;
import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.handler.ssl.SslHandler;
import java.util.logging.Level;
import java.util.logging.Logger;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelStateEvent;
import io.netty.channel.ExceptionEvent;
import io.netty.channel.MessageEvent;
import io.netty.channel.SimpleChannelUpstreamHandler;
import io.netty.handler.ssl.SslHandler;
/**
* Handles a client-side channel.
*/
public class SecureChatClientHandler extends SimpleChannelUpstreamHandler {
public class SecureChatClientHandler extends ChannelInboundMessageHandlerAdapter<String> {
private static final Logger logger = Logger.getLogger(
SecureChatClientHandler.class.getName());
@Override
public void handleUpstream(
ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
if (e instanceof ChannelStateEvent) {
logger.info(e.toString());
}
super.handleUpstream(ctx, e);
}
@Override
public void channelConnected(
ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
public void channelActive(ChannelInboundHandlerContext<String> ctx) throws Exception {
// Get the SslHandler from the pipeline
// which were added in SecureChatPipelineFactory.
SslHandler sslHandler = ctx.pipeline().get(SslHandler.class);
@ -55,18 +41,15 @@ public class SecureChatClientHandler extends SimpleChannelUpstreamHandler {
}
@Override
public void messageReceived(
ChannelHandlerContext ctx, MessageEvent e) {
System.err.println(e.getMessage());
public void messageReceived(ChannelInboundHandlerContext<String> ctx, String msg) throws Exception {
System.err.println(msg);
}
@Override
public void exceptionCaught(
ChannelHandlerContext ctx, ExceptionEvent e) {
public void exceptionCaught(ChannelInboundHandlerContext<String> ctx, Throwable cause) throws Exception {
logger.log(
Level.WARNING,
"Unexpected exception from downstream.",
e.cause());
e.channel().close();
"Unexpected exception from downstream.", cause);
ctx.close();
}
}

View File

@ -15,27 +15,25 @@
*/
package io.netty.example.securechat;
import static io.netty.channel.Channels.*;
import javax.net.ssl.SSLEngine;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPipelineFactory;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.ssl.SslHandler;
import javax.net.ssl.SSLEngine;
/**
* Creates a newly configured {@link ChannelPipeline} for a new channel.
*/
public class SecureChatClientPipelineFactory implements
ChannelPipelineFactory {
public class SecureChatClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = pipeline();
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// Add SSL handler first to encrypt and decrypt everything.
// In this example, we use a bogus certificate in the server side
@ -57,7 +55,5 @@ public class SecureChatClientPipelineFactory implements
// and then business logic.
pipeline.addLast("handler", new SecureChatClientHandler());
return pipeline;
}
}

View File

@ -15,11 +15,9 @@
*/
package io.netty.example.securechat;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.socket.nio.NioServerSocketChannelFactory;
import io.netty.channel.socket.nio.NioEventLoop;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.example.telnet.TelnetServer;
/**
@ -34,16 +32,17 @@ public class SecureChatServer {
}
public void run() {
// Configure the server.
ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory(
Executors.newCachedThreadPool()));
ServerBootstrap b = new ServerBootstrap();
try {
b.eventLoop(new NioEventLoop(), new NioEventLoop())
.channel(new NioServerSocketChannel())
.localAddress(port)
.childInitializer(new SecureChatServerInitializer());
// Configure the pipeline factory.
bootstrap.setPipelineFactory(new SecureChatServerPipelineFactory());
// Bind and start to accept incoming connections.
bootstrap.bind(new InetSocketAddress(port));
b.bind().sync().channel().closeFuture().sync();
} finally {
b.shutdown();
}
}
public static void main(String[] args) throws Exception {

View File

@ -15,27 +15,23 @@
*/
package io.netty.example.securechat;
import java.net.InetAddress;
import java.util.logging.Level;
import java.util.logging.Logger;
import io.netty.channel.Channel;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelStateEvent;
import io.netty.channel.ExceptionEvent;
import io.netty.channel.MessageEvent;
import io.netty.channel.SimpleChannelUpstreamHandler;
import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.ssl.SslHandler;
import java.net.InetAddress;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Handles a server-side channel.
*/
public class SecureChatServerHandler extends SimpleChannelUpstreamHandler {
public class SecureChatServerHandler extends ChannelInboundMessageHandlerAdapter<String> {
private static final Logger logger = Logger.getLogger(
SecureChatServerHandler.class.getName());
@ -43,18 +39,7 @@ public class SecureChatServerHandler extends SimpleChannelUpstreamHandler {
static final ChannelGroup channels = new DefaultChannelGroup();
@Override
public void handleUpstream(
ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
if (e instanceof ChannelStateEvent) {
logger.info(e.toString());
}
super.handleUpstream(ctx, e);
}
@Override
public void channelConnected(
ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
public void channelActive(ChannelInboundHandlerContext<String> ctx) throws Exception {
// Get the SslHandler in the current pipeline.
// We added it in SecureChatPipelineFactory.
final SslHandler sslHandler = ctx.pipeline().get(SslHandler.class);
@ -65,24 +50,11 @@ public class SecureChatServerHandler extends SimpleChannelUpstreamHandler {
}
@Override
public void channelDisconnected(
ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
// Unregister the channel from the global channel list
// so the channel does not receive messages anymore.
channels.remove(e.channel());
}
@Override
public void messageReceived(
ChannelHandlerContext ctx, MessageEvent e) {
// Convert to a String first.
String request = (String) e.getMessage();
public void messageReceived(ChannelInboundHandlerContext<String> ctx, String request) throws Exception {
// Send the received message to all channels but the current one.
for (Channel c: channels) {
if (c != e.channel()) {
c.write("[" + e.channel().getRemoteAddress() + "] " +
if (c != ctx.channel()) {
c.write("[" + ctx.channel().remoteAddress() + "] " +
request + '\n');
} else {
c.write("[you] " + request + '\n');
@ -91,18 +63,16 @@ public class SecureChatServerHandler extends SimpleChannelUpstreamHandler {
// Close the connection if the client has sent 'bye'.
if (request.toLowerCase().equals("bye")) {
e.channel().close();
ctx.close();
}
}
@Override
public void exceptionCaught(
ChannelHandlerContext ctx, ExceptionEvent e) {
public void exceptionCaught(ChannelInboundHandlerContext<String> ctx, Throwable cause) throws Exception {
logger.log(
Level.WARNING,
"Unexpected exception from downstream.",
e.cause());
e.channel().close();
"Unexpected exception from downstream.", cause);
ctx.close();
}
private static final class Greeter implements ChannelFutureListener {

View File

@ -15,27 +15,25 @@
*/
package io.netty.example.securechat;
import static io.netty.channel.Channels.*;
import javax.net.ssl.SSLEngine;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPipelineFactory;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.ssl.SslHandler;
import javax.net.ssl.SSLEngine;
/**
* Creates a newly configured {@link ChannelPipeline} for a new channel.
*/
public class SecureChatServerPipelineFactory implements
ChannelPipelineFactory {
public class SecureChatServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = pipeline();
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// Add SSL handler first to encrypt and decrypt everything.
// In this example, we use a bogus certificate in the server side
@ -60,7 +58,5 @@ public class SecureChatServerPipelineFactory implements
// and then business logic.
pipeline.addLast("handler", new SecureChatServerHandler());
return pipeline;
}
}

View File

@ -15,16 +15,14 @@
*/
package io.netty.example.telnet;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import io.netty.bootstrap.ClientBootstrap;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.socket.nio.NioClientSocketChannelFactory;
import io.netty.channel.socket.nio.NioEventLoop;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.io.BufferedReader;
import java.io.InputStreamReader;
/**
* Simplistic telnet client.
@ -39,57 +37,44 @@ public class TelnetClient {
this.port = port;
}
public void run() throws IOException {
// Configure the client.
ClientBootstrap bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(
Executors.newCachedThreadPool()));
public void run() throws Exception {
Bootstrap b = new Bootstrap();
try {
b.eventLoop(new NioEventLoop())
.channel(new NioSocketChannel())
.remoteAddress(host, port)
.initializer(new TelnetClientInitializer());
// Configure the pipeline factory.
bootstrap.setPipelineFactory(new TelnetClientPipelineFactory());
// Start the connection attempt.
Channel ch = b.connect().sync().channel();
// Start the connection attempt.
ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
// Read commands from the stdin.
ChannelFuture lastWriteFuture = null;
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
for (;;) {
String line = in.readLine();
if (line == null) {
break;
}
// Wait until the connection attempt succeeds or fails.
Channel channel = future.awaitUninterruptibly().channel();
if (!future.isSuccess()) {
future.cause().printStackTrace();
bootstrap.releaseExternalResources();
return;
}
// Sends the received line to the server.
lastWriteFuture = ch.write(line + "\r\n");
// Read commands from the stdin.
ChannelFuture lastWriteFuture = null;
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
for (;;) {
String line = in.readLine();
if (line == null) {
break;
// If user typed the 'bye' command, wait until the server closes
// the connection.
if (line.toLowerCase().equals("bye")) {
ch.closeFuture().sync();
break;
}
}
// Sends the received line to the server.
lastWriteFuture = channel.write(line + "\r\n");
// If user typed the 'bye' command, wait until the server closes
// the connection.
if (line.toLowerCase().equals("bye")) {
channel.getCloseFuture().awaitUninterruptibly();
break;
// Wait until all messages are flushed before closing the channel.
if (lastWriteFuture != null) {
lastWriteFuture.sync();
}
} finally {
b.shutdown();
}
// Wait until all messages are flushed before closing the channel.
if (lastWriteFuture != null) {
lastWriteFuture.awaitUninterruptibly();
}
// Close the connection. Make sure the close operation ends because
// all I/O operations are asynchronous in Netty.
channel.close().awaitUninterruptibly();
// Shut down all thread pools to exit.
bootstrap.releaseExternalResources();
}
public static void main(String[] args) throws Exception {

View File

@ -15,47 +15,31 @@
*/
package io.netty.example.telnet;
import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import java.util.logging.Level;
import java.util.logging.Logger;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelStateEvent;
import io.netty.channel.ExceptionEvent;
import io.netty.channel.MessageEvent;
import io.netty.channel.SimpleChannelUpstreamHandler;
/**
* Handles a client-side channel.
*/
public class TelnetClientHandler extends SimpleChannelUpstreamHandler {
public class TelnetClientHandler extends ChannelInboundMessageHandlerAdapter<String> {
private static final Logger logger = Logger.getLogger(
TelnetClientHandler.class.getName());
@Override
public void handleUpstream(
ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
if (e instanceof ChannelStateEvent) {
logger.info(e.toString());
}
super.handleUpstream(ctx, e);
}
@Override
public void messageReceived(
ChannelHandlerContext ctx, MessageEvent e) {
public void messageReceived(ChannelInboundHandlerContext<String> ctx, String msg) throws Exception {
// Print out the line received from the server.
System.err.println(e.getMessage());
System.err.println(msg);
}
@Override
public void exceptionCaught(
ChannelHandlerContext ctx, ExceptionEvent e) {
public void exceptionCaught(ChannelInboundHandlerContext<String> ctx, Throwable cause) throws Exception {
logger.log(
Level.WARNING,
"Unexpected exception from downstream.",
e.cause());
e.channel().close();
"Unexpected exception from downstream.", cause);
ctx.close();
}
}

View File

@ -15,10 +15,9 @@
*/
package io.netty.example.telnet;
import static io.netty.channel.Channels.*;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPipelineFactory;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
@ -27,13 +26,11 @@ import io.netty.handler.codec.string.StringEncoder;
/**
* Creates a newly configured {@link ChannelPipeline} for a new channel.
*/
public class TelnetClientPipelineFactory implements
ChannelPipelineFactory {
public class TelnetClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
public ChannelPipeline getPipeline() throws Exception {
// Create a default pipeline implementation.
ChannelPipeline pipeline = pipeline();
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// Add the text line codec combination first,
pipeline.addLast("framer", new DelimiterBasedFrameDecoder(
@ -43,7 +40,5 @@ public class TelnetClientPipelineFactory implements
// and then business logic.
pipeline.addLast("handler", new TelnetClientHandler());
return pipeline;
}
}

View File

@ -15,11 +15,9 @@
*/
package io.netty.example.telnet;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.socket.nio.NioServerSocketChannelFactory;
import io.netty.channel.socket.nio.NioEventLoop;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* Simplistic telnet server.
@ -32,17 +30,18 @@ public class TelnetServer {
this.port = port;
}
public void run() {
// Configure the server.
ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory(
Executors.newCachedThreadPool()));
public void run() throws Exception {
ServerBootstrap b = new ServerBootstrap();
try {
b.eventLoop(new NioEventLoop(), new NioEventLoop())
.channel(new NioServerSocketChannel())
.localAddress(port)
.childInitializer(new TelnetServerPipelineFactory());
// Configure the pipeline factory.
bootstrap.setPipelineFactory(new TelnetServerPipelineFactory());
// Bind and start to accept incoming connections.
bootstrap.bind(new InetSocketAddress(port));
b.bind().sync().channel().closeFuture().sync();
} finally {
b.shutdown();
}
}
public static void main(String[] args) throws Exception {

View File

@ -15,54 +15,34 @@
*/
package io.netty.example.telnet;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import java.net.InetAddress;
import java.util.Date;
import java.util.logging.Level;
import java.util.logging.Logger;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelStateEvent;
import io.netty.channel.ExceptionEvent;
import io.netty.channel.MessageEvent;
import io.netty.channel.SimpleChannelUpstreamHandler;
/**
* Handles a server-side channel.
*/
public class TelnetServerHandler extends SimpleChannelUpstreamHandler {
public class TelnetServerHandler extends ChannelInboundMessageHandlerAdapter<String> {
private static final Logger logger = Logger.getLogger(
TelnetServerHandler.class.getName());
@Override
public void handleUpstream(
ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
if (e instanceof ChannelStateEvent) {
logger.info(e.toString());
}
super.handleUpstream(ctx, e);
}
@Override
public void channelConnected(
ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
public void channelActive(ChannelInboundHandlerContext<String> ctx) throws Exception {
// Send greeting for a new connection.
e.channel().write(
ctx.write(
"Welcome to " + InetAddress.getLocalHost().getHostName() + "!\r\n");
e.channel().write("It is " + new Date() + " now.\r\n");
ctx.write("It is " + new Date() + " now.\r\n");
}
@Override
public void messageReceived(
ChannelHandlerContext ctx, MessageEvent e) {
// Cast to a String first.
// We know it is a String because we put some codec in TelnetPipelineFactory.
String request = (String) e.getMessage();
public void messageReceived(ChannelInboundHandlerContext<String> ctx, String request) throws Exception {
// Generate and write a response.
String response;
boolean close = false;
@ -77,7 +57,7 @@ public class TelnetServerHandler extends SimpleChannelUpstreamHandler {
// We do not need to write a ChannelBuffer here.
// We know the encoder inserted at TelnetPipelineFactory will do the conversion.
ChannelFuture future = e.channel().write(response);
ChannelFuture future = ctx.write(response);
// Close the connection after sending 'Have a good day!'
// if the client has sent 'bye'.
@ -87,12 +67,10 @@ public class TelnetServerHandler extends SimpleChannelUpstreamHandler {
}
@Override
public void exceptionCaught(
ChannelHandlerContext ctx, ExceptionEvent e) {
public void exceptionCaught(ChannelInboundHandlerContext<String> ctx, Throwable cause) throws Exception {
logger.log(
Level.WARNING,
"Unexpected exception from downstream.",
e.cause());
e.channel().close();
"Unexpected exception from downstream.", cause);
ctx.close();
}
}

View File

@ -15,10 +15,9 @@
*/
package io.netty.example.telnet;
import static io.netty.channel.Channels.*;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPipelineFactory;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
@ -27,13 +26,10 @@ import io.netty.handler.codec.string.StringEncoder;
/**
* Creates a newly configured {@link ChannelPipeline} for a new channel.
*/
public class TelnetServerPipelineFactory implements
ChannelPipelineFactory {
public class TelnetServerPipelineFactory extends ChannelInitializer<SocketChannel> {
@Override
public ChannelPipeline getPipeline() throws Exception {
// Create a default pipeline implementation.
ChannelPipeline pipeline = pipeline();
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// Add the text line codec combination first,
pipeline.addLast("framer", new DelimiterBasedFrameDecoder(
@ -43,7 +39,5 @@ public class TelnetServerPipelineFactory implements
// and then business logic.
pipeline.addLast("handler", new TelnetServerHandler());
return pipeline;
}
}

View File

@ -15,18 +15,13 @@
*/
package io.netty.example.uptime;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import io.netty.bootstrap.ClientBootstrap;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPipelineFactory;
import io.netty.channel.Channels;
import io.netty.channel.socket.nio.NioClientSocketChannelFactory;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoop;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioEventLoop;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
/**
@ -51,34 +46,27 @@ public class UptimeClient {
}
public void run() {
// Initialize the timer that schedules subsequent reconnection attempts.
final Timer timer = new HashedWheelTimer();
configureBootstrap(new Bootstrap()).connect();
}
// Configure the client.
final ClientBootstrap bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(
Executors.newCachedThreadPool()));
private Bootstrap configureBootstrap(Bootstrap b) {
return configureBootstrap(b, new NioEventLoop());
}
// Configure the pipeline factory.
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
private final ChannelHandler timeoutHandler =
new ReadTimeoutHandler(timer, READ_TIMEOUT);
private final ChannelHandler uptimeHandler =
new UptimeClientHandler(bootstrap, timer);
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(
timeoutHandler, uptimeHandler);
Bootstrap configureBootstrap(Bootstrap b, EventLoop l) {
b.eventLoop(l)
.channel(new NioSocketChannel())
.remoteAddress(host, port)
.initializer(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new ReadTimeoutHandler(READ_TIMEOUT),
new UptimeClientHandler(UptimeClient.this));
}
});
});
bootstrap.setOption(
"remoteAddress", new InetSocketAddress(host, port));
// Initiate the first connection attempt - the rest is handled by
// UptimeClientHandler.
bootstrap.connect();
return b;
}
public static void main(String[] args) throws Exception {

View File

@ -15,67 +15,58 @@
*/
package io.netty.example.uptime;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import io.netty.bootstrap.ClientBootstrap;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelStateEvent;
import io.netty.channel.ExceptionEvent;
import io.netty.channel.SimpleChannelUpstreamHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.channel.ChannelInboundStreamHandlerAdapter;
import io.netty.channel.EventLoop;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;
import java.net.ConnectException;
import java.util.concurrent.TimeUnit;
/**
* Keep reconnecting to the server while printing out the current uptime and
* connection attempt status.
*/
public class UptimeClientHandler extends SimpleChannelUpstreamHandler {
public class UptimeClientHandler extends ChannelInboundStreamHandlerAdapter {
final ClientBootstrap bootstrap;
private final Timer timer;
private final UptimeClient client;
private long startTime = -1;
public UptimeClientHandler(ClientBootstrap bootstrap, Timer timer) {
this.bootstrap = bootstrap;
this.timer = timer;
}
InetSocketAddress getRemoteAddress() {
return (InetSocketAddress) bootstrap.getOption("remoteAddress");
public UptimeClientHandler(UptimeClient client) {
this.client = client;
}
@Override
public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
println("Disconnected from: " + getRemoteAddress());
public void channelActive(ChannelInboundHandlerContext<Byte> ctx) throws Exception {
if (startTime < 0) {
startTime = System.currentTimeMillis();
}
println("Connected to: " + ctx.channel().remoteAddress());
}
@Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) {
public void channelInactive(ChannelInboundHandlerContext<Byte> ctx) throws Exception {
println("Disconnected from: " + ctx.channel().remoteAddress());
}
@Override
public void channelUnregistered(final ChannelInboundHandlerContext<Byte> ctx)
throws Exception {
println("Sleeping for: " + UptimeClient.RECONNECT_DELAY + "s");
timer.newTimeout(new TimerTask() {
public void run(Timeout timeout) throws Exception {
println("Reconnecting to: " + getRemoteAddress());
bootstrap.connect();
final EventLoop loop = ctx.channel().eventLoop();
loop.schedule(new Runnable() {
@Override
public void run() {
println("Reconnecting to: " + ctx.channel().remoteAddress());
client.configureBootstrap(new Bootstrap(), loop).connect();
}
}, UptimeClient.RECONNECT_DELAY, TimeUnit.SECONDS);
}
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
if (startTime < 0) {
startTime = System.currentTimeMillis();
}
println("Connected to: " + getRemoteAddress());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
Throwable cause = e.cause();
public void exceptionCaught(ChannelInboundHandlerContext<Byte> ctx, Throwable cause) throws Exception {
if (cause instanceof ConnectException) {
startTime = -1;
println("Failed to connect: " + cause.getMessage());

View File

@ -1,5 +1,12 @@
package io.netty.channel;
package io.netty.bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoop;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
@ -10,9 +17,9 @@ import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
public class ChannelBootstrap {
public class Bootstrap {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelBootstrap.class);
private static final InternalLogger logger = InternalLoggerFactory.getInstance(Bootstrap.class);
private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
private EventLoop eventLoop;
@ -21,23 +28,29 @@ public class ChannelBootstrap {
private SocketAddress localAddress;
private SocketAddress remoteAddress;
public ChannelBootstrap eventLoop(EventLoop eventLoop) {
public Bootstrap eventLoop(EventLoop eventLoop) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (this.eventLoop != null) {
throw new IllegalStateException("eventLoop set already");
}
this.eventLoop = eventLoop;
return this;
}
public ChannelBootstrap channel(Channel channel) {
public Bootstrap channel(Channel channel) {
if (channel == null) {
throw new NullPointerException("channel");
}
if (this.channel != null) {
throw new IllegalStateException("channel set already");
}
this.channel = channel;
return this;
}
public <T> ChannelBootstrap option(ChannelOption<T> option, T value) {
public <T> Bootstrap option(ChannelOption<T> option, T value) {
if (option == null) {
throw new NullPointerException("option");
}
@ -49,7 +62,7 @@ public class ChannelBootstrap {
return this;
}
public ChannelBootstrap initializer(ChannelHandler initializer) {
public Bootstrap initializer(ChannelHandler initializer) {
if (initializer == null) {
throw new NullPointerException("initializer");
}
@ -57,27 +70,27 @@ public class ChannelBootstrap {
return this;
}
public ChannelBootstrap localAddress(SocketAddress localAddress) {
public Bootstrap localAddress(SocketAddress localAddress) {
this.localAddress = localAddress;
return this;
}
public ChannelBootstrap localAddress(int port) {
public Bootstrap localAddress(int port) {
localAddress = new InetSocketAddress(port);
return this;
}
public ChannelBootstrap localAddress(String host, int port) {
public Bootstrap localAddress(String host, int port) {
localAddress = new InetSocketAddress(host, port);
return this;
}
public ChannelBootstrap remoteAddress(SocketAddress remoteAddress) {
public Bootstrap remoteAddress(SocketAddress remoteAddress) {
this.remoteAddress = remoteAddress;
return this;
}
public ChannelBootstrap remoteAddress(String host, int port) {
public Bootstrap remoteAddress(String host, int port) {
remoteAddress = new InetSocketAddress(host, port);
return this;
}
@ -141,7 +154,7 @@ public class ChannelBootstrap {
}
ChannelPipeline p = channel.pipeline();
p.addLast(DefaultChannelPipeline.generateName(initializer), initializer);
p.addLast(initializer);
for (Entry<ChannelOption<?>, Object> e: options.entrySet()) {
try {

View File

@ -1,5 +1,18 @@
package io.netty.channel;
package io.netty.bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoop;
import io.netty.channel.ServerChannel;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import io.netty.util.SocketAddresses;
@ -12,16 +25,15 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
public class ServerChannelBootstrap {
public class ServerBootstrap {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(ServerChannelBootstrap.class);
private static final InternalLogger logger = InternalLoggerFactory.getInstance(ServerBootstrap.class);
private static final InetSocketAddress DEFAULT_LOCAL_ADDR = new InetSocketAddress(SocketAddresses.LOCALHOST, 0);
private final ChannelHandler acceptor = new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
Acceptor acceptor = new Acceptor();
ch.pipeline().addLast(DefaultChannelPipeline.generateName(acceptor), acceptor);
ch.pipeline().addLast(new Acceptor());
}
};
@ -34,24 +46,30 @@ public class ServerChannelBootstrap {
private ChannelHandler childInitializer;
private SocketAddress localAddress;
public ServerChannelBootstrap eventLoop(EventLoop parentEventLoop, EventLoop childEventLoop) {
public ServerBootstrap eventLoop(EventLoop parentEventLoop, EventLoop childEventLoop) {
if (parentEventLoop == null) {
throw new NullPointerException("parentEventLoop");
}
if (this.parentEventLoop != null) {
throw new IllegalStateException("eventLoop set already");
}
this.parentEventLoop = parentEventLoop;
this.childEventLoop = childEventLoop;
return this;
}
public ServerChannelBootstrap channel(ServerChannel channel) {
public ServerBootstrap channel(ServerChannel channel) {
if (channel == null) {
throw new NullPointerException("channel");
}
if (this.channel != null) {
throw new IllegalStateException("channel set already");
}
this.channel = channel;
return this;
}
public <T> ServerChannelBootstrap option(ChannelOption<T> parentOption, T value) {
public <T> ServerBootstrap option(ChannelOption<T> parentOption, T value) {
if (parentOption == null) {
throw new NullPointerException("parentOption");
}
@ -63,7 +81,7 @@ public class ServerChannelBootstrap {
return this;
}
public <T> ServerChannelBootstrap childOption(ChannelOption<T> childOption, T value) {
public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) {
if (childOption == null) {
throw new NullPointerException("childOption");
}
@ -75,12 +93,12 @@ public class ServerChannelBootstrap {
return this;
}
public ServerChannelBootstrap initializer(ChannelHandler initializer) {
public ServerBootstrap initializer(ChannelHandler initializer) {
this.initializer = initializer;
return this;
}
public ServerChannelBootstrap childInitializer(ChannelHandler childInitializer) {
public ServerBootstrap childInitializer(ChannelHandler childInitializer) {
if (childInitializer == null) {
throw new NullPointerException("childInitializer");
}
@ -88,7 +106,7 @@ public class ServerChannelBootstrap {
return this;
}
public ServerChannelBootstrap localAddress(SocketAddress localAddress) {
public ServerBootstrap localAddress(SocketAddress localAddress) {
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
@ -96,12 +114,12 @@ public class ServerChannelBootstrap {
return this;
}
public ServerChannelBootstrap localAddress(int port) {
public ServerBootstrap localAddress(int port) {
localAddress = new InetSocketAddress(port);
return this;
}
public ServerChannelBootstrap localAddress(String host, int port) {
public ServerBootstrap localAddress(String host, int port) {
localAddress = new InetSocketAddress(host, port);
return this;
}
@ -128,9 +146,9 @@ public class ServerChannelBootstrap {
ChannelPipeline p = channel.pipeline();
if (initializer != null) {
p.addLast(DefaultChannelPipeline.generateName(initializer), initializer);
p.addLast(initializer);
}
p.addLast(DefaultChannelPipeline.generateName(acceptor), acceptor);
p.addLast(acceptor);
ChannelFuture f = parentEventLoop.register(channel).awaitUninterruptibly();
if (!f.isSuccess()) {
@ -187,7 +205,7 @@ public class ServerChannelBootstrap {
break;
}
child.pipeline().addLast(DefaultChannelPipeline.generateName(childInitializer), childInitializer);
child.pipeline().addLast(childInitializer);
for (Entry<ChannelOption<?>, Object> e: childOptions.entrySet()) {
try {

View File

@ -179,7 +179,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
return this;
}
static String generateName(ChannelHandler handler) {
private static String generateName(ChannelHandler handler) {
String type = handler.getClass().getSimpleName();
StringBuilder buf = new StringBuilder(type.length() + 10);
buf.append(type);