Split DiscardHandler into DiscardClientHandler and DiscardServerHandler for easier understanding
This commit is contained in:
parent
7de02097d0
commit
5e99787df6
@ -29,6 +29,14 @@ import org.jboss.netty.bootstrap.ClientBootstrap;
|
|||||||
import org.jboss.netty.channel.ChannelFactory;
|
import org.jboss.netty.channel.ChannelFactory;
|
||||||
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
|
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Keeps sending random data to the specified address.
|
||||||
|
*
|
||||||
|
* @author The Netty Project (netty-dev@lists.jboss.org)
|
||||||
|
* @author Trustin Lee (tlee@redhat.com)
|
||||||
|
*
|
||||||
|
* @version $Rev$, $Date$
|
||||||
|
*/
|
||||||
public class DiscardClient {
|
public class DiscardClient {
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
@ -58,7 +66,7 @@ public class DiscardClient {
|
|||||||
Executors.newCachedThreadPool());
|
Executors.newCachedThreadPool());
|
||||||
|
|
||||||
ClientBootstrap bootstrap = new ClientBootstrap(factory);
|
ClientBootstrap bootstrap = new ClientBootstrap(factory);
|
||||||
DiscardHandler handler = new DiscardHandler(firstMessageSize);
|
DiscardClientHandler handler = new DiscardClientHandler(firstMessageSize);
|
||||||
|
|
||||||
bootstrap.getPipeline().addLast("handler", handler);
|
bootstrap.getPipeline().addLast("handler", handler);
|
||||||
bootstrap.setOption("tcpNoDelay", true);
|
bootstrap.setOption("tcpNoDelay", true);
|
||||||
|
@ -38,22 +38,25 @@ import org.jboss.netty.channel.ExceptionEvent;
|
|||||||
import org.jboss.netty.channel.MessageEvent;
|
import org.jboss.netty.channel.MessageEvent;
|
||||||
import org.jboss.netty.channel.SimpleChannelHandler;
|
import org.jboss.netty.channel.SimpleChannelHandler;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @author The Netty Project (netty-dev@lists.jboss.org)
|
||||||
|
* @author Trustin Lee (tlee@redhat.com)
|
||||||
|
*
|
||||||
|
* @version $Rev$, $Date$
|
||||||
|
*/
|
||||||
@ChannelPipelineCoverage("all")
|
@ChannelPipelineCoverage("all")
|
||||||
public class DiscardHandler extends SimpleChannelHandler {
|
public class DiscardClientHandler extends SimpleChannelHandler {
|
||||||
|
|
||||||
private static final Logger logger = Logger.getLogger(
|
private static final Logger logger = Logger.getLogger(
|
||||||
DiscardHandler.class.getName());
|
DiscardClientHandler.class.getName());
|
||||||
|
|
||||||
private final Random random = new Random();
|
private final Random random = new Random();
|
||||||
private final int messageSize;
|
private final int messageSize;
|
||||||
private final AtomicLong transferredBytes = new AtomicLong();
|
private final AtomicLong transferredBytes = new AtomicLong();
|
||||||
|
|
||||||
public DiscardHandler() {
|
public DiscardClientHandler(int messageSize) {
|
||||||
this(0);
|
if (messageSize <= 0) {
|
||||||
}
|
|
||||||
|
|
||||||
public DiscardHandler(int messageSize) {
|
|
||||||
if (messageSize < 0) {
|
|
||||||
throw new IllegalArgumentException(
|
throw new IllegalArgumentException(
|
||||||
"messageSize: " + messageSize);
|
"messageSize: " + messageSize);
|
||||||
}
|
}
|
||||||
@ -69,26 +72,32 @@ public class DiscardHandler extends SimpleChannelHandler {
|
|||||||
if (e instanceof ChannelStateEvent) {
|
if (e instanceof ChannelStateEvent) {
|
||||||
logger.info(e.toString());
|
logger.info(e.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Let SimpleChannelHandler call actual event handler methods below.
|
||||||
super.handleUpstream(ctx, e);
|
super.handleUpstream(ctx, e);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
|
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
|
||||||
|
// Send the initial messages.
|
||||||
generateTraffic(e);
|
generateTraffic(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void channelInterestChanged(ChannelHandlerContext ctx, ChannelStateEvent e) {
|
public void channelInterestChanged(ChannelHandlerContext ctx, ChannelStateEvent e) {
|
||||||
|
// Keep sending messages whenever the current socket buffer has room.
|
||||||
generateTraffic(e);
|
generateTraffic(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
|
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
|
||||||
|
// Server is supposed to send nothing. Therefore, do nothing.
|
||||||
transferredBytes.addAndGet(((ChannelBuffer) e.getMessage()).readableBytes());
|
transferredBytes.addAndGet(((ChannelBuffer) e.getMessage()).readableBytes());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
|
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
|
||||||
|
// Close the connection when an exception is raised.
|
||||||
logger.log(
|
logger.log(
|
||||||
Level.WARNING,
|
Level.WARNING,
|
||||||
"Unexpected exception from downstream.",
|
"Unexpected exception from downstream.",
|
||||||
@ -108,10 +117,6 @@ public class DiscardHandler extends SimpleChannelHandler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private ChannelBuffer nextMessage() {
|
private ChannelBuffer nextMessage() {
|
||||||
if (messageSize == 0) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
byte[] content = new byte[messageSize];
|
byte[] content = new byte[messageSize];
|
||||||
random.nextBytes(content);
|
random.nextBytes(content);
|
||||||
return ChannelBuffers.wrappedBuffer(content);
|
return ChannelBuffers.wrappedBuffer(content);
|
@ -29,6 +29,14 @@ import org.jboss.netty.bootstrap.ServerBootstrap;
|
|||||||
import org.jboss.netty.channel.ChannelFactory;
|
import org.jboss.netty.channel.ChannelFactory;
|
||||||
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
|
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Discards any incoming data.
|
||||||
|
*
|
||||||
|
* @author The Netty Project (netty-dev@lists.jboss.org)
|
||||||
|
* @author Trustin Lee (tlee@redhat.com)
|
||||||
|
*
|
||||||
|
* @version $Rev$, $Date$
|
||||||
|
*/
|
||||||
public class DiscardServer {
|
public class DiscardServer {
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
@ -39,7 +47,7 @@ public class DiscardServer {
|
|||||||
Executors.newCachedThreadPool());
|
Executors.newCachedThreadPool());
|
||||||
|
|
||||||
ServerBootstrap bootstrap = new ServerBootstrap(factory);
|
ServerBootstrap bootstrap = new ServerBootstrap(factory);
|
||||||
DiscardHandler handler = new DiscardHandler();
|
DiscardServerHandler handler = new DiscardServerHandler();
|
||||||
|
|
||||||
bootstrap.getPipeline().addLast("handler", handler);
|
bootstrap.getPipeline().addLast("handler", handler);
|
||||||
bootstrap.setOption("child.tcpNoDelay", true);
|
bootstrap.setOption("child.tcpNoDelay", true);
|
||||||
|
@ -0,0 +1,82 @@
|
|||||||
|
/*
|
||||||
|
* JBoss, Home of Professional Open Source
|
||||||
|
*
|
||||||
|
* Copyright 2008, Red Hat Middleware LLC, and individual contributors
|
||||||
|
* by the @author tags. See the COPYRIGHT.txt in the distribution for a
|
||||||
|
* full listing of individual contributors.
|
||||||
|
*
|
||||||
|
* This is free software; you can redistribute it and/or modify it
|
||||||
|
* under the terms of the GNU Lesser General Public License as
|
||||||
|
* published by the Free Software Foundation; either version 2.1 of
|
||||||
|
* the License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This software is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||||
|
* Lesser General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Lesser General Public
|
||||||
|
* License along with this software; if not, write to the Free
|
||||||
|
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
|
||||||
|
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
|
||||||
|
*/
|
||||||
|
package org.jboss.netty.example.discard;
|
||||||
|
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
import java.util.logging.Level;
|
||||||
|
import java.util.logging.Logger;
|
||||||
|
|
||||||
|
import org.jboss.netty.buffer.ChannelBuffer;
|
||||||
|
import org.jboss.netty.channel.ChannelEvent;
|
||||||
|
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||||
|
import org.jboss.netty.channel.ChannelPipelineCoverage;
|
||||||
|
import org.jboss.netty.channel.ChannelStateEvent;
|
||||||
|
import org.jboss.netty.channel.ExceptionEvent;
|
||||||
|
import org.jboss.netty.channel.MessageEvent;
|
||||||
|
import org.jboss.netty.channel.SimpleChannelHandler;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @author The Netty Project (netty-dev@lists.jboss.org)
|
||||||
|
* @author Trustin Lee (tlee@redhat.com)
|
||||||
|
*
|
||||||
|
* @version $Rev$, $Date$
|
||||||
|
*/
|
||||||
|
@ChannelPipelineCoverage("all")
|
||||||
|
public class DiscardServerHandler extends SimpleChannelHandler {
|
||||||
|
|
||||||
|
private static final Logger logger = Logger.getLogger(
|
||||||
|
DiscardServerHandler.class.getName());
|
||||||
|
|
||||||
|
private final AtomicLong transferredBytes = new AtomicLong();
|
||||||
|
|
||||||
|
public long getTransferredBytes() {
|
||||||
|
return transferredBytes.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
|
||||||
|
if (e instanceof ChannelStateEvent) {
|
||||||
|
logger.info(e.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Let SimpleChannelHandler call actual event handler methods below.
|
||||||
|
super.handleUpstream(ctx, e);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
|
||||||
|
// Discard received data silently by doing nothing.
|
||||||
|
transferredBytes.addAndGet(((ChannelBuffer) e.getMessage()).readableBytes());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
|
||||||
|
// Close the connection when an exception is raised.
|
||||||
|
logger.log(
|
||||||
|
Level.WARNING,
|
||||||
|
"Unexpected exception from downstream.",
|
||||||
|
e.getCause());
|
||||||
|
e.getChannel().close();
|
||||||
|
}
|
||||||
|
}
|
@ -24,15 +24,15 @@ package org.jboss.netty.example.discard;
|
|||||||
|
|
||||||
public class ThroughputMonitor extends Thread {
|
public class ThroughputMonitor extends Thread {
|
||||||
|
|
||||||
private final DiscardHandler echoHandler;
|
private final DiscardServerHandler handler;
|
||||||
|
|
||||||
public ThroughputMonitor(DiscardHandler echoHandler) {
|
public ThroughputMonitor(DiscardServerHandler echoHandler) {
|
||||||
this.echoHandler = echoHandler;
|
this.handler = echoHandler;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
long oldCounter = echoHandler.getTransferredBytes();
|
long oldCounter = handler.getTransferredBytes();
|
||||||
long startTime = System.currentTimeMillis();
|
long startTime = System.currentTimeMillis();
|
||||||
for (;;) {
|
for (;;) {
|
||||||
try {
|
try {
|
||||||
@ -42,7 +42,7 @@ public class ThroughputMonitor extends Thread {
|
|||||||
}
|
}
|
||||||
|
|
||||||
long endTime = System.currentTimeMillis();
|
long endTime = System.currentTimeMillis();
|
||||||
long newCounter = echoHandler.getTransferredBytes();
|
long newCounter = handler.getTransferredBytes();
|
||||||
System.err.format(
|
System.err.format(
|
||||||
"%4.3f MiB/s%n",
|
"%4.3f MiB/s%n",
|
||||||
(newCounter - oldCounter) * 1000 / (endTime - startTime) /
|
(newCounter - oldCounter) * 1000 / (endTime - startTime) /
|
||||||
|
Loading…
Reference in New Issue
Block a user