Updated all examples to use ChannelPipelineFactory instead of the default pipeline

This commit is contained in:
Trustin Lee 2010-01-14 02:57:42 +00:00
parent f67b06a931
commit 48f74e7cd6
25 changed files with 368 additions and 306 deletions

View File

@ -20,6 +20,9 @@ import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
/** /**
@ -42,9 +45,9 @@ public class DiscardClient {
} }
// Parse options. // Parse options.
String host = args[0]; final String host = args[0];
int port = Integer.parseInt(args[1]); final int port = Integer.parseInt(args[1]);
int firstMessageSize; final int firstMessageSize;
if (args.length == 3) { if (args.length == 3) {
firstMessageSize = Integer.parseInt(args[2]); firstMessageSize = Integer.parseInt(args[2]);
} else { } else {
@ -57,9 +60,13 @@ public class DiscardClient {
Executors.newCachedThreadPool(), Executors.newCachedThreadPool(),
Executors.newCachedThreadPool())); Executors.newCachedThreadPool()));
// Set up the default event pipeline. // Set up the pipeline factory.
DiscardClientHandler handler = new DiscardClientHandler(firstMessageSize); bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
bootstrap.getPipeline().addLast("handler", handler); public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(
new DiscardClientHandler(firstMessageSize));
}
});
// Start the connection attempt. // Start the connection attempt.
ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port)); ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));

View File

@ -19,6 +19,9 @@ import java.net.InetSocketAddress;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ServerBootstrap; import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
/** /**
@ -38,14 +41,14 @@ public class DiscardServer {
Executors.newCachedThreadPool(), Executors.newCachedThreadPool(),
Executors.newCachedThreadPool())); Executors.newCachedThreadPool()));
// Set up the default event pipeline. // Set up the pipeline factory.
DiscardServerHandler handler = new DiscardServerHandler(); bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
bootstrap.getPipeline().addLast("handler", handler); public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(new DiscardServerHandler());
}
});
// Bind and start to accept incoming connections. // Bind and start to accept incoming connections.
bootstrap.bind(new InetSocketAddress(8080)); bootstrap.bind(new InetSocketAddress(8080));
// Start performance monitor.
new ThroughputMonitor(handler).start();
} }
} }

View File

@ -1,55 +0,0 @@
/*
* Copyright 2009 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.example.discard;
/**
* Measures and prints the current throughput every 3 seconds.
*
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Trustin Lee (trustin@gmail.com)
*
* @version $Rev$, $Date$
*/
public class ThroughputMonitor extends Thread {
private final DiscardServerHandler handler;
public ThroughputMonitor(DiscardServerHandler handler) {
this.handler = handler;
}
@Override
public void run() {
long oldCounter = handler.getTransferredBytes();
long startTime = System.currentTimeMillis();
for (;;) {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
long endTime = System.currentTimeMillis();
long newCounter = handler.getTransferredBytes();
System.err.format(
"%4.3f MiB/s%n",
(newCounter - oldCounter) * 1000.0 / (endTime - startTime) /
1048576.0);
oldCounter = newCounter;
startTime = endTime;
}
}
}

View File

@ -20,6 +20,9 @@ import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
/** /**
@ -46,9 +49,9 @@ public class EchoClient {
} }
// Parse options. // Parse options.
String host = args[0]; final String host = args[0];
int port = Integer.parseInt(args[1]); final int port = Integer.parseInt(args[1]);
int firstMessageSize; final int firstMessageSize;
if (args.length == 3) { if (args.length == 3) {
firstMessageSize = Integer.parseInt(args[2]); firstMessageSize = Integer.parseInt(args[2]);
} else { } else {
@ -61,9 +64,13 @@ public class EchoClient {
Executors.newCachedThreadPool(), Executors.newCachedThreadPool(),
Executors.newCachedThreadPool())); Executors.newCachedThreadPool()));
// Set up the default event pipeline. // Set up the pipeline factory.
EchoClientHandler handler = new EchoClientHandler(firstMessageSize); bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
bootstrap.getPipeline().addLast("handler", handler); public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(
new EchoClientHandler(firstMessageSize));
}
});
// Start the connection attempt. // Start the connection attempt.
ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port)); ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));

View File

@ -19,6 +19,9 @@ import java.net.InetSocketAddress;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ServerBootstrap; import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
/** /**
@ -39,14 +42,14 @@ public class EchoServer {
Executors.newCachedThreadPool(), Executors.newCachedThreadPool(),
Executors.newCachedThreadPool())); Executors.newCachedThreadPool()));
// Set up the default event pipeline. // Set up the pipeline factory.
EchoServerHandler handler = new EchoServerHandler(); bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
bootstrap.getPipeline().addLast("handler", handler); public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(new EchoServerHandler());
}
});
// Bind and start to accept incoming connections. // Bind and start to accept incoming connections.
bootstrap.bind(new InetSocketAddress(8080)); bootstrap.bind(new InetSocketAddress(8080));
// Start performance monitor.
new ThroughputMonitor(handler).start();
} }
} }

View File

@ -1,69 +0,0 @@
/*
* Copyright 2009 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.example.echo;
import org.jboss.netty.channel.ChannelHandler;
/**
* Measures and prints the current throughput every 3 seconds.
*
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Trustin Lee (trustin@gmail.com)
*
* @version $Rev$, $Date$
*/
public class ThroughputMonitor extends Thread {
private final ChannelHandler handler;
public ThroughputMonitor(EchoClientHandler handler) {
this.handler = handler;
}
public ThroughputMonitor(EchoServerHandler handler) {
this.handler = handler;
}
@Override
public void run() {
long oldCounter = getTransferredBytes();
long startTime = System.currentTimeMillis();
for (;;) {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
long endTime = System.currentTimeMillis();
long newCounter = getTransferredBytes();
System.err.format(
"%4.3f MiB/s%n",
(newCounter - oldCounter) * 1000.0 / (endTime - startTime) /
1048576.0);
oldCounter = newCounter;
startTime = endTime;
}
}
private long getTransferredBytes() {
if (handler instanceof EchoClientHandler) {
return ((EchoClientHandler) handler).getTransferredBytes();
} else {
return ((EchoServerHandler) handler).getTransferredBytes();
}
}
}

View File

@ -23,6 +23,9 @@ import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.http.HttpTunnelingClientSocketChannelFactory; import org.jboss.netty.channel.socket.http.HttpTunnelingClientSocketChannelFactory;
import org.jboss.netty.channel.socket.oio.OioClientSocketChannelFactory; import org.jboss.netty.channel.socket.oio.OioClientSocketChannelFactory;
import org.jboss.netty.example.securechat.SecureChatSslContextFactory; import org.jboss.netty.example.securechat.SecureChatSslContextFactory;
@ -62,10 +65,14 @@ public class HttpTunnelingClientExample {
new HttpTunnelingClientSocketChannelFactory( new HttpTunnelingClientSocketChannelFactory(
new OioClientSocketChannelFactory(Executors.newCachedThreadPool()))); new OioClientSocketChannelFactory(Executors.newCachedThreadPool())));
// Set up the default event pipeline. b.setPipelineFactory(new ChannelPipelineFactory() {
b.getPipeline().addLast("decoder", new StringDecoder()); public ChannelPipeline getPipeline() throws Exception {
b.getPipeline().addLast("encoder", new StringEncoder()); return Channels.pipeline(
b.getPipeline().addLast("handler", new LoggingHandler(InternalLogLevel.INFO)); new StringDecoder(),
new StringEncoder(),
new LoggingHandler(InternalLogLevel.INFO));
}
});
// Set additional options required by the HTTP tunneling transport. // Set additional options required by the HTTP tunneling transport.
b.setOption("serverName", uri.getHost()); b.setOption("serverName", uri.getHost());

View File

@ -21,6 +21,9 @@ import java.io.InputStreamReader;
import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.bootstrap.ServerBootstrap; import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.local.DefaultLocalClientChannelFactory; import org.jboss.netty.channel.local.DefaultLocalClientChannelFactory;
import org.jboss.netty.channel.local.DefaultLocalServerChannelFactory; import org.jboss.netty.channel.local.DefaultLocalServerChannelFactory;
import org.jboss.netty.channel.local.LocalAddress; import org.jboss.netty.channel.local.LocalAddress;
@ -55,10 +58,15 @@ public class LocalExample {
ClientBootstrap cb = new ClientBootstrap( ClientBootstrap cb = new ClientBootstrap(
new DefaultLocalClientChannelFactory()); new DefaultLocalClientChannelFactory());
// Set up the default client-side event pipeline. // Set up the client-side pipeline factory.
cb.getPipeline().addLast("decoder", new StringDecoder()); cb.setPipelineFactory(new ChannelPipelineFactory() {
cb.getPipeline().addLast("encoder", new StringEncoder()); public ChannelPipeline getPipeline() throws Exception {
cb.getPipeline().addLast("handler", new LoggingHandler(InternalLogLevel.INFO)); return Channels.pipeline(
new StringDecoder(),
new StringEncoder(),
new LoggingHandler(InternalLogLevel.INFO));
}
});
// Make the connection attempt to the server. // Make the connection attempt to the server.
ChannelFuture channelFuture = cb.connect(socketAddress); ChannelFuture channelFuture = cb.connect(socketAddress);

View File

@ -20,6 +20,9 @@ import java.util.concurrent.TimeUnit;
import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.bootstrap.ServerBootstrap; import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.local.DefaultLocalClientChannelFactory; import org.jboss.netty.channel.local.DefaultLocalClientChannelFactory;
import org.jboss.netty.channel.local.DefaultLocalServerChannelFactory; import org.jboss.netty.channel.local.DefaultLocalServerChannelFactory;
import org.jboss.netty.channel.local.LocalAddress; import org.jboss.netty.channel.local.LocalAddress;
@ -54,9 +57,14 @@ public class LocalExampleMultthreaded {
ClientBootstrap cb = new ClientBootstrap( ClientBootstrap cb = new ClientBootstrap(
new DefaultLocalClientChannelFactory()); new DefaultLocalClientChannelFactory());
cb.getPipeline().addLast("decoder", new StringDecoder()); cb.setPipelineFactory(new ChannelPipelineFactory() {
cb.getPipeline().addLast("encoder", new StringEncoder()); public ChannelPipeline getPipeline() throws Exception {
cb.getPipeline().addLast("handler", new LoggingHandler(InternalLogLevel.INFO)); return Channels.pipeline(
new StringDecoder(),
new StringEncoder(),
new LoggingHandler(InternalLogLevel.INFO));
}
});
// Read commands from array // Read commands from array
String[] commands = { "First", "Second", "Third", "quit" }; String[] commands = { "First", "Second", "Third", "quit" };

View File

@ -19,8 +19,13 @@ import java.net.InetSocketAddress;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.example.echo.EchoClient; import org.jboss.netty.example.echo.EchoClient;
import org.jboss.netty.handler.codec.serialization.ObjectDecoder;
import org.jboss.netty.handler.codec.serialization.ObjectEncoder;
/** /**
* Modification of {@link EchoClient} which utilizes Java object serialization. * Modification of {@link EchoClient} which utilizes Java object serialization.
@ -42,9 +47,9 @@ public class ObjectEchoClient {
} }
// Parse options. // Parse options.
String host = args[0]; final String host = args[0];
int port = Integer.parseInt(args[1]); final int port = Integer.parseInt(args[1]);
int firstMessageSize; final int firstMessageSize;
if (args.length == 3) { if (args.length == 3) {
firstMessageSize = Integer.parseInt(args[2]); firstMessageSize = Integer.parseInt(args[2]);
@ -58,14 +63,17 @@ public class ObjectEchoClient {
Executors.newCachedThreadPool(), Executors.newCachedThreadPool(),
Executors.newCachedThreadPool())); Executors.newCachedThreadPool()));
// Set up the default event pipeline. // Set up the pipeline factory.
ObjectEchoHandler handler = new ObjectEchoHandler(firstMessageSize); bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
bootstrap.getPipeline().addLast("handler", handler); public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(
new ObjectEncoder(),
new ObjectDecoder(),
new ObjectEchoClientHandler(firstMessageSize));
}
});
// Start the connection attempt. // Start the connection attempt.
bootstrap.connect(new InetSocketAddress(host, port)); bootstrap.connect(new InetSocketAddress(host, port));
// Start performance monitor.
new ThroughputMonitor(handler).start();
} }
} }

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2009 Red Hat, Inc. * Copyright 2010 Red Hat, Inc.
* *
* Red Hat licenses this file to you under the Apache License, version 2.0 * Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the * (the "License"); you may not use this file except in compliance with the
@ -29,12 +29,11 @@ import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler; import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.handler.codec.serialization.ObjectDecoder;
import org.jboss.netty.handler.codec.serialization.ObjectEncoder;
/** /**
* Handles both client-side and server-side handler depending on which * Handler implementation for the object echo client. It initiates the
* constructor was called. * ping-pong traffic between the object echo client and server by sending the
* first message to the server.
* *
* @author The Netty Project (netty-dev@lists.jboss.org) * @author The Netty Project (netty-dev@lists.jboss.org)
* @author Trustin Lee (trustin@gmail.com) * @author Trustin Lee (trustin@gmail.com)
@ -42,25 +41,18 @@ import org.jboss.netty.handler.codec.serialization.ObjectEncoder;
* @version $Rev$, $Date$ * @version $Rev$, $Date$
*/ */
@ChannelPipelineCoverage("all") @ChannelPipelineCoverage("all")
public class ObjectEchoHandler extends SimpleChannelUpstreamHandler { public class ObjectEchoClientHandler extends SimpleChannelUpstreamHandler {
private static final Logger logger = Logger.getLogger( private static final Logger logger = Logger.getLogger(
ObjectEchoHandler.class.getName()); ObjectEchoClientHandler.class.getName());
private final List<Integer> firstMessage; private final List<Integer> firstMessage;
private final AtomicLong transferredMessages = new AtomicLong(); private final AtomicLong transferredMessages = new AtomicLong();
/**
* Creates a server-side handler.
*/
public ObjectEchoHandler() {
firstMessage = new ArrayList<Integer>();
}
/** /**
* Creates a client-side handler. * Creates a client-side handler.
*/ */
public ObjectEchoHandler(int firstMessageSize) { public ObjectEchoClientHandler(int firstMessageSize) {
if (firstMessageSize <= 0) { if (firstMessageSize <= 0) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"firstMessageSize: " + firstMessageSize); "firstMessageSize: " + firstMessageSize);
@ -85,22 +77,11 @@ public class ObjectEchoHandler extends SimpleChannelUpstreamHandler {
super.handleUpstream(ctx, e); super.handleUpstream(ctx, e);
} }
@Override
public void channelOpen(ChannelHandlerContext ctx,
ChannelStateEvent e) throws Exception {
// Add encoder and decoder as soon as a new channel is created so that
// a Java object is serialized and deserialized.
e.getChannel().getPipeline().addFirst("encoder", new ObjectEncoder());
e.getChannel().getPipeline().addFirst("decoder", new ObjectDecoder());
}
@Override @Override
public void channelConnected( public void channelConnected(
ChannelHandlerContext ctx, ChannelStateEvent e) { ChannelHandlerContext ctx, ChannelStateEvent e) {
// Send the first message if this handler is a client-side handler. // Send the first message if this handler is a client-side handler.
if (!firstMessage.isEmpty()) { e.getChannel().write(firstMessage);
e.getChannel().write(firstMessage);
}
} }
@Override @Override

View File

@ -19,8 +19,13 @@ import java.net.InetSocketAddress;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ServerBootstrap; import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.example.echo.EchoServer; import org.jboss.netty.example.echo.EchoServer;
import org.jboss.netty.handler.codec.serialization.ObjectDecoder;
import org.jboss.netty.handler.codec.serialization.ObjectEncoder;
/** /**
* Modification of {@link EchoServer} which utilizes Java object serialization. * Modification of {@link EchoServer} which utilizes Java object serialization.
@ -39,14 +44,17 @@ public class ObjectEchoServer {
Executors.newCachedThreadPool(), Executors.newCachedThreadPool(),
Executors.newCachedThreadPool())); Executors.newCachedThreadPool()));
// Set up the default event pipeline. // Set up the pipeline factory.
ObjectEchoHandler handler = new ObjectEchoHandler(); bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
bootstrap.getPipeline().addLast("handler", handler); public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(
new ObjectEncoder(),
new ObjectDecoder(),
new ObjectEchoServerHandler());
}
});
// Bind and start to accept incoming connections. // Bind and start to accept incoming connections.
bootstrap.bind(new InetSocketAddress(8080)); bootstrap.bind(new InetSocketAddress(8080));
// Start performance monitor.
new ThroughputMonitor(handler).start();
} }
} }

View File

@ -0,0 +1,79 @@
/*
* Copyright 2010 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.example.objectecho;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelState;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
/**
* Handles both client-side and server-side handler depending on which
* constructor was called.
*
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Trustin Lee (trustin@gmail.com)
*
* @version $Rev$, $Date$
*/
@ChannelPipelineCoverage("all")
public class ObjectEchoServerHandler extends SimpleChannelUpstreamHandler {
private static final Logger logger = Logger.getLogger(
ObjectEchoServerHandler.class.getName());
private final AtomicLong transferredMessages = new AtomicLong();
public long getTransferredMessages() {
return transferredMessages.get();
}
@Override
public void handleUpstream(
ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
if (e instanceof ChannelStateEvent &&
((ChannelStateEvent) e).getState() != ChannelState.INTEREST_OPS) {
logger.info(e.toString());
}
super.handleUpstream(ctx, e);
}
@Override
public void messageReceived(
ChannelHandlerContext ctx, MessageEvent e) {
// Echo back the received object to the client.
transferredMessages.incrementAndGet();
e.getChannel().write(e.getMessage());
}
@Override
public void exceptionCaught(
ChannelHandlerContext ctx, ExceptionEvent e) {
logger.log(
Level.WARNING,
"Unexpected exception from downstream.",
e.getCause());
e.getChannel().close();
}
}

View File

@ -1,54 +0,0 @@
/*
* Copyright 2009 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.example.objectecho;
/**
* Measures and prints the current throughput every 3 seconds.
*
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Trustin Lee (trustin@gmail.com)
*
* @version $Rev$, $Date$
*/
public class ThroughputMonitor extends Thread {
private final ObjectEchoHandler handler;
public ThroughputMonitor(ObjectEchoHandler handler) {
this.handler = handler;
}
@Override
public void run() {
long oldCounter = handler.getTransferredMessages();
long startTime = System.currentTimeMillis();
for (;;) {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
long endTime = System.currentTimeMillis();
long newCounter = handler.getTransferredMessages();
System.err.format(
"%4.3f msgs/s%n",
(newCounter - oldCounter) * 1000 / (double) (endTime - startTime));
oldCounter = newCounter;
startTime = endTime;
}
}
}

View File

@ -20,6 +20,8 @@ import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ConnectionlessBootstrap; import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.FixedReceiveBufferSizePredictorFactory; import org.jboss.netty.channel.FixedReceiveBufferSizePredictorFactory;
import org.jboss.netty.channel.socket.DatagramChannel; import org.jboss.netty.channel.socket.DatagramChannel;
import org.jboss.netty.channel.socket.DatagramChannelFactory; import org.jboss.netty.channel.socket.DatagramChannelFactory;
@ -46,11 +48,15 @@ public class QuoteOfTheMomentClient {
ConnectionlessBootstrap b = new ConnectionlessBootstrap(f); ConnectionlessBootstrap b = new ConnectionlessBootstrap(f);
// Configure the pipeline. // Configure the pipeline factory.
ChannelPipeline p = b.getPipeline(); b.setPipelineFactory(new ChannelPipelineFactory() {
p.addLast("encoder", new StringEncoder(CharsetUtil.ISO_8859_1)); public ChannelPipeline getPipeline() throws Exception {
p.addLast("decoder", new StringDecoder(CharsetUtil.ISO_8859_1)); return Channels.pipeline(
p.addLast("handler", new QuoteOfTheMomentClientHandler()); new StringEncoder(CharsetUtil.ISO_8859_1),
new StringDecoder(CharsetUtil.ISO_8859_1),
new QuoteOfTheMomentClientHandler());
}
});
// Enable broadcast // Enable broadcast
b.setOption("broadcast", "true"); b.setOption("broadcast", "true");

View File

@ -20,6 +20,8 @@ import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ConnectionlessBootstrap; import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.FixedReceiveBufferSizePredictorFactory; import org.jboss.netty.channel.FixedReceiveBufferSizePredictorFactory;
import org.jboss.netty.channel.socket.DatagramChannelFactory; import org.jboss.netty.channel.socket.DatagramChannelFactory;
import org.jboss.netty.channel.socket.oio.OioDatagramChannelFactory; import org.jboss.netty.channel.socket.oio.OioDatagramChannelFactory;
@ -45,11 +47,15 @@ public class QuoteOfTheMomentServer {
ConnectionlessBootstrap b = new ConnectionlessBootstrap(f); ConnectionlessBootstrap b = new ConnectionlessBootstrap(f);
// Configure the pipeline. // Configure the pipeline factory.
ChannelPipeline p = b.getPipeline(); b.setPipelineFactory(new ChannelPipelineFactory() {
p.addLast("encoder", new StringEncoder(CharsetUtil.ISO_8859_1)); public ChannelPipeline getPipeline() throws Exception {
p.addLast("decoder", new StringDecoder(CharsetUtil.ISO_8859_1)); return Channels.pipeline(
p.addLast("handler", new QuoteOfTheMomentServerHandler()); new StringEncoder(CharsetUtil.ISO_8859_1),
new StringDecoder(CharsetUtil.ISO_8859_1),
new QuoteOfTheMomentServerHandler());
}
});
// Enable broadcast // Enable broadcast
b.setOption("broadcast", "false"); b.setOption("broadcast", "false");

View File

@ -56,8 +56,8 @@ public class SecureChatClient {
Executors.newCachedThreadPool(), Executors.newCachedThreadPool(),
Executors.newCachedThreadPool())); Executors.newCachedThreadPool()));
SecureChatClientHandler handler = new SecureChatClientHandler(); // Configure the pipeline factory.
bootstrap.setPipelineFactory(new SecureChatPipelineFactory(handler)); bootstrap.setPipelineFactory(new SecureChatClientPipelineFactory());
// Start the connection attempt. // Start the connection attempt.
ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port)); ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));

View File

@ -0,0 +1,68 @@
/*
* Copyright 2010 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.example.securechat;
import static org.jboss.netty.channel.Channels.*;
import javax.net.ssl.SSLEngine;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
import org.jboss.netty.handler.codec.frame.Delimiters;
import org.jboss.netty.handler.codec.string.StringDecoder;
import org.jboss.netty.handler.codec.string.StringEncoder;
import org.jboss.netty.handler.ssl.SslHandler;
/**
* Creates a newly configured {@link ChannelPipeline} for a new channel.
*
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Trustin Lee (trustin@gmail.com)
*
* @version $Rev$, $Date$
*
*/
public class SecureChatClientPipelineFactory implements
ChannelPipelineFactory {
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = pipeline();
// Add SSL handler first to encrypt and decrypt everything.
// In this example, we use a bogus certificate in the server side
// and accept any invalid certificates in the client side.
// You will need something more complicated to identify both
// and server in the real world.
SSLEngine engine =
SecureChatSslContextFactory.getClientContext().createSSLEngine();
engine.setUseClientMode(true);
pipeline.addLast("ssl", new SslHandler(engine));
// On top of the SSL handler, add the text line codec.
pipeline.addLast("framer", new DelimiterBasedFrameDecoder(
8192, Delimiters.lineDelimiter()));
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
// and then business logic.
pipeline.addLast("handler", new SecureChatClientHandler());
return pipeline;
}
}

View File

@ -39,9 +39,8 @@ public class SecureChatServer {
Executors.newCachedThreadPool(), Executors.newCachedThreadPool(),
Executors.newCachedThreadPool())); Executors.newCachedThreadPool()));
SecureChatServerHandler handler = new SecureChatServerHandler(); // Configure the pipeline factory.
bootstrap.setPipelineFactory(new SecureChatServerPipelineFactory());
bootstrap.setPipelineFactory(new SecureChatPipelineFactory(handler));
// Bind and start to accept incoming connections. // Bind and start to accept incoming connections.
bootstrap.bind(new InetSocketAddress(8080)); bootstrap.bind(new InetSocketAddress(8080));

View File

@ -19,7 +19,6 @@ import static org.jboss.netty.channel.Channels.*;
import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLEngine;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder; import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
@ -37,15 +36,9 @@ import org.jboss.netty.handler.ssl.SslHandler;
* @version $Rev$, $Date$ * @version $Rev$, $Date$
* *
*/ */
public class SecureChatPipelineFactory implements public class SecureChatServerPipelineFactory implements
ChannelPipelineFactory { ChannelPipelineFactory {
private final ChannelHandler handler;
public SecureChatPipelineFactory(ChannelHandler handler) {
this.handler = handler;
}
public ChannelPipeline getPipeline() throws Exception { public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = pipeline(); ChannelPipeline pipeline = pipeline();
@ -55,14 +48,9 @@ public class SecureChatPipelineFactory implements
// You will need something more complicated to identify both // You will need something more complicated to identify both
// and server in the real world. // and server in the real world.
SSLEngine engine; SSLEngine engine =
if (handler instanceof SecureChatClientHandler) { SecureChatSslContextFactory.getServerContext().createSSLEngine();
engine = SecureChatSslContextFactory.getClientContext().createSSLEngine(); engine.setUseClientMode(false);
engine.setUseClientMode(true);
} else {
engine = SecureChatSslContextFactory.getServerContext().createSSLEngine();
engine.setUseClientMode(false);
}
pipeline.addLast("ssl", new SslHandler(engine)); pipeline.addLast("ssl", new SslHandler(engine));
@ -73,7 +61,7 @@ public class SecureChatPipelineFactory implements
pipeline.addLast("encoder", new StringEncoder()); pipeline.addLast("encoder", new StringEncoder());
// and then business logic. // and then business logic.
pipeline.addLast("handler", handler); pipeline.addLast("handler", new SecureChatServerHandler());
return pipeline; return pipeline;
} }

View File

@ -54,8 +54,8 @@ public class TelnetClient {
Executors.newCachedThreadPool(), Executors.newCachedThreadPool(),
Executors.newCachedThreadPool())); Executors.newCachedThreadPool()));
TelnetClientHandler handler = new TelnetClientHandler(); // Configure the pipeline factory.
bootstrap.setPipelineFactory(new TelnetPipelineFactory(handler)); bootstrap.setPipelineFactory(new TelnetClientPipelineFactory());
// Start the connection attempt. // Start the connection attempt.
ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port)); ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));

View File

@ -17,7 +17,6 @@ package org.jboss.netty.example.telnet;
import static org.jboss.netty.channel.Channels.*; import static org.jboss.netty.channel.Channels.*;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder; import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
@ -34,15 +33,9 @@ import org.jboss.netty.handler.codec.string.StringEncoder;
* @version $Rev$, $Date$ * @version $Rev$, $Date$
* *
*/ */
public class TelnetPipelineFactory implements public class TelnetClientPipelineFactory implements
ChannelPipelineFactory { ChannelPipelineFactory {
private final ChannelHandler handler;
public TelnetPipelineFactory(ChannelHandler handler) {
this.handler = handler;
}
public ChannelPipeline getPipeline() throws Exception { public ChannelPipeline getPipeline() throws Exception {
// Create a default pipeline implementation. // Create a default pipeline implementation.
ChannelPipeline pipeline = pipeline(); ChannelPipeline pipeline = pipeline();
@ -54,7 +47,7 @@ public class TelnetPipelineFactory implements
pipeline.addLast("encoder", new StringEncoder()); pipeline.addLast("encoder", new StringEncoder());
// and then business logic. // and then business logic.
pipeline.addLast("handler", handler); pipeline.addLast("handler", new TelnetClientHandler());
return pipeline; return pipeline;
} }

View File

@ -38,8 +38,8 @@ public class TelnetServer {
Executors.newCachedThreadPool(), Executors.newCachedThreadPool(),
Executors.newCachedThreadPool())); Executors.newCachedThreadPool()));
TelnetServerHandler handler = new TelnetServerHandler(); // Configure the pipeline factory.
bootstrap.setPipelineFactory(new TelnetPipelineFactory(handler)); bootstrap.setPipelineFactory(new TelnetServerPipelineFactory());
// Bind and start to accept incoming connections. // Bind and start to accept incoming connections.
bootstrap.bind(new InetSocketAddress(8080)); bootstrap.bind(new InetSocketAddress(8080));

View File

@ -0,0 +1,54 @@
/*
* Copyright 2009 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.example.telnet;
import static org.jboss.netty.channel.Channels.*;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
import org.jboss.netty.handler.codec.frame.Delimiters;
import org.jboss.netty.handler.codec.string.StringDecoder;
import org.jboss.netty.handler.codec.string.StringEncoder;
/**
* Creates a newly configured {@link ChannelPipeline} for a new channel.
*
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Trustin Lee (trustin@gmail.com)
*
* @version $Rev$, $Date$
*
*/
public class TelnetServerPipelineFactory implements
ChannelPipelineFactory {
public ChannelPipeline getPipeline() throws Exception {
// Create a default pipeline implementation.
ChannelPipeline pipeline = pipeline();
// Add the text line codec combination first,
pipeline.addLast("framer", new DelimiterBasedFrameDecoder(
8192, Delimiters.lineDelimiter()));
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
// and then business logic.
pipeline.addLast("handler", new TelnetServerHandler());
return pipeline;
}
}

View File

@ -19,6 +19,9 @@ import java.net.InetSocketAddress;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.handler.timeout.ReadTimeoutHandler; import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
import org.jboss.netty.util.HashedWheelTimer; import org.jboss.netty.util.HashedWheelTimer;
@ -57,18 +60,22 @@ public class UptimeClient {
int port = Integer.parseInt(args[1]); int port = Integer.parseInt(args[1]);
// Initialize the timer that schedules subsequent reconnection attempts. // Initialize the timer that schedules subsequent reconnection attempts.
Timer timer = new HashedWheelTimer(); final Timer timer = new HashedWheelTimer();
// Configure the client. // Configure the client.
ClientBootstrap bootstrap = new ClientBootstrap( final ClientBootstrap bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory( new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(), Executors.newCachedThreadPool(),
Executors.newCachedThreadPool())); Executors.newCachedThreadPool()));
bootstrap.getPipeline().addLast( // Configure the pipeline factory.
"timeout", new ReadTimeoutHandler(timer, READ_TIMEOUT)); bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
bootstrap.getPipeline().addLast( public ChannelPipeline getPipeline() throws Exception {
"handler", new UptimeClientHandler(bootstrap, timer)); return Channels.pipeline(
new ReadTimeoutHandler(timer, READ_TIMEOUT),
new UptimeClientHandler(bootstrap, timer));
}
});
bootstrap.setOption( bootstrap.setOption(
"remoteAddress", new InetSocketAddress(host, port)); "remoteAddress", new InetSocketAddress(host, port));