* Implemented old blocking I/O based UDP transport - supports multicast, too

* Added Quote of the Moment example to show how to use the UDP transport
This commit is contained in:
Trustin Lee 2009-03-12 14:48:48 +00:00
parent 5754d9d87f
commit dbbd7ed3f5
8 changed files with 894 additions and 0 deletions

View File

@ -0,0 +1,144 @@
/*
* JBoss, Home of Professional Open Source
*
* Copyright 2008, Red Hat Middleware LLC, and individual contributors
* by the @author tags. See the COPYRIGHT.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.channel.socket.oio;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.MulticastSocket;
import java.net.NetworkInterface;
import java.net.SocketAddress;
import org.jboss.netty.channel.AbstractChannel;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink;
import org.jboss.netty.channel.socket.DatagramChannel;
import org.jboss.netty.channel.socket.DatagramChannelConfig;
import org.jboss.netty.channel.socket.DefaultDatagramChannelConfig;
/**
*
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Trustin Lee (tlee@redhat.com)
*
* @version $Rev$, $Date$
*
*/
final class OioDatagramChannel extends AbstractChannel
implements DatagramChannel {
final MulticastSocket socket;
private final DatagramChannelConfig config;
volatile Thread workerThread;
OioDatagramChannel(
ChannelFactory factory,
ChannelPipeline pipeline,
ChannelSink sink) {
super(null, factory, pipeline, sink);
try {
socket = new MulticastSocket(null);
} catch (IOException e) {
throw new ChannelException("Failed to open a datagram socket.", e);
}
config = new DefaultDatagramChannelConfig(socket);
}
public DatagramChannelConfig getConfig() {
return config;
}
public InetSocketAddress getLocalAddress() {
return (InetSocketAddress) socket.getLocalSocketAddress();
}
public InetSocketAddress getRemoteAddress() {
return (InetSocketAddress) socket.getRemoteSocketAddress();
}
public boolean isBound() {
return isOpen() && socket.isBound();
}
public boolean isConnected() {
return isOpen() && socket.isConnected();
}
@Override
protected boolean setClosed() {
return super.setClosed();
}
@Override
protected void setInterestOpsNow(int interestOps) {
super.setInterestOpsNow(interestOps);
}
@Override
public ChannelFuture write(Object message, SocketAddress remoteAddress) {
if (remoteAddress == null || remoteAddress.equals(getRemoteAddress())) {
return super.write(message, null);
} else {
return super.write(message, remoteAddress);
}
}
public void joinGroup(InetAddress multicastAddress) {
try {
socket.joinGroup(multicastAddress);
} catch (IOException e) {
throw new ChannelException(e);
}
}
public void joinGroup(
InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
try {
socket.joinGroup(multicastAddress, networkInterface);
} catch (IOException e) {
throw new ChannelException(e);
}
}
public void leaveGroup(InetAddress multicastAddress) {
try {
socket.leaveGroup(multicastAddress);
} catch (IOException e) {
throw new ChannelException(e);
}
}
public void leaveGroup(
InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
try {
socket.leaveGroup(multicastAddress, networkInterface);
} catch (IOException e) {
throw new ChannelException(e);
}
}
}

View File

@ -0,0 +1,110 @@
/*
* JBoss, Home of Professional Open Source
*
* Copyright 2008, Red Hat Middleware LLC, and individual contributors
* by the @author tags. See the COPYRIGHT.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.channel.socket.oio;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.socket.DatagramChannel;
import org.jboss.netty.channel.socket.DatagramChannelFactory;
import org.jboss.netty.util.ExecutorUtil;
/**
* A {@link DatagramChannelFactory} which creates a client-side blocking
* I/O based {@link DatagramChannel}. It utilizes the good old blocking I/O API
* which is known to yield better throughput and latency when there are
* relatively small number of connections to serve.
*
* <h3>How threads work</h3>
* <p>
* There is only one type of threads in {@link OioDatagramChannelFactory};
* worker threads.
*
* <h4>Worker threads</h4>
* <p>
* Each {@link Channel} has a dedicated worker thread, just like a
* traditional blocking I/O thread model.
*
* <h3>Life cycle of threads and graceful shutdown</h3>
* <p>
* Worker threads are acquired from the {@link Executor} which was specified
* when a {@link OioDatagramChannelFactory} was created (i.e. {@code workerExecutor}.)
* Therefore, you should make sure the specified {@link Executor} is able to
* lend the sufficient number of threads.
* <p>
* Worker threads are acquired lazily, and then released when there's nothing
* left to process. All the related resources are also released when the
* worker threads are released. Therefore, to shut down a service gracefully,
* you should do the following:
*
* <ol>
* <li>close all channels created by the factory, and</li>
* <li>call {@link #releaseExternalResources()}.</li>
* </ol>
*
* Please make sure not to shut down the executor until all channels are
* closed. Otherwise, you will end up with a {@link RejectedExecutionException}
* and the related resources might not be released properly.
*
* <h3>Limitation</h3>
* <p>
* A {@link DatagramChannel} created by this factory does not support asynchronous
* operations. Any I/O requests such as {@code "write"} will be performed in a
* blocking manner.
*
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Trustin Lee (tlee@redhat.com)
*
* @version $Rev$, $Date$
*
* @apiviz.landmark
*/
public class OioDatagramChannelFactory implements DatagramChannelFactory {
private final Executor workerExecutor;
final OioDatagramPipelineSink sink;
/**
* Creates a new instance.
*
* @param workerExecutor
* the {@link Executor} which will execute the I/O worker threads
*/
public OioDatagramChannelFactory(Executor workerExecutor) {
if (workerExecutor == null) {
throw new NullPointerException("workerExecutor");
}
this.workerExecutor = workerExecutor;
sink = new OioDatagramPipelineSink(workerExecutor);
}
public DatagramChannel newChannel(ChannelPipeline pipeline) {
return new OioDatagramChannel(this, pipeline, sink);
}
public void releaseExternalResources() {
ExecutorUtil.terminate(workerExecutor);
}
}

View File

@ -0,0 +1,176 @@
/*
* JBoss, Home of Professional Open Source
*
* Copyright 2008, Red Hat Middleware LLC, and individual contributors
* by the @author tags. See the COPYRIGHT.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.channel.socket.oio;
import static org.jboss.netty.channel.Channels.*;
import java.net.SocketAddress;
import java.util.concurrent.Executor;
import org.jboss.netty.channel.AbstractChannelSink;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelState;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.util.ThreadRenamingRunnable;
/**
*
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Trustin Lee (tlee@redhat.com)
*
* @version $Rev$, $Date$
*
*/
class OioDatagramPipelineSink extends AbstractChannelSink {
private final Executor workerExecutor;
OioDatagramPipelineSink(Executor workerExecutor) {
this.workerExecutor = workerExecutor;
}
public void eventSunk(
ChannelPipeline pipeline, ChannelEvent e) throws Exception {
OioDatagramChannel channel = (OioDatagramChannel) e.getChannel();
ChannelFuture future = e.getFuture();
if (e instanceof ChannelStateEvent) {
ChannelStateEvent stateEvent = (ChannelStateEvent) e;
ChannelState state = stateEvent.getState();
Object value = stateEvent.getValue();
switch (state) {
case OPEN:
if (Boolean.FALSE.equals(value)) {
OioDatagramWorker.close(channel, future);
}
break;
case BOUND:
if (value != null) {
bind(channel, future, (SocketAddress) value);
} else {
OioDatagramWorker.close(channel, future);
}
break;
case CONNECTED:
if (value != null) {
connect(channel, future, (SocketAddress) value);
} else {
OioDatagramWorker.disconnect(channel, future);
}
break;
case INTEREST_OPS:
OioDatagramWorker.setInterestOps(channel, future, ((Integer) value).intValue());
break;
}
} else if (e instanceof MessageEvent) {
MessageEvent evt = (MessageEvent) e;
OioDatagramWorker.write(
channel, future, evt.getMessage(), evt.getRemoteAddress());
}
}
private void bind(
OioDatagramChannel channel, ChannelFuture future,
SocketAddress localAddress) {
boolean bound = false;
boolean workerStarted = false;
try {
channel.socket.bind(localAddress);
bound = true;
// Fire events
future.setSuccess();
fireChannelBound(channel, channel.getLocalAddress());
// Start the business.
workerExecutor.execute(new ThreadRenamingRunnable(
new OioDatagramWorker(channel),
"Old I/O datagram worker (channelId: " + channel.getId() + ", " +
channel.getLocalAddress() + ')'));
workerStarted = true;
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
} finally {
if (bound && !workerStarted) {
OioDatagramWorker.close(channel, future);
}
}
}
private void connect(
OioDatagramChannel channel, ChannelFuture future,
SocketAddress remoteAddress) {
boolean bound = channel.isBound();
boolean connected = false;
boolean workerStarted = false;
future.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
try {
channel.socket.connect(remoteAddress);
connected = true;
// Fire events.
future.setSuccess();
if (!bound) {
fireChannelBound(channel, channel.getLocalAddress());
}
fireChannelConnected(channel, channel.getRemoteAddress());
String threadName =
"Old I/O datagram worker (channelId: " + channel.getId() + ", " +
channel.getLocalAddress() + " => " +
channel.getRemoteAddress() + ')';
if (!bound) {
// Start the business.
workerExecutor.execute(new ThreadRenamingRunnable(
new OioDatagramWorker(channel), threadName));
} else {
// Worker started by bind() - just rename.
Thread workerThread = channel.workerThread;
if (workerThread != null) {
try {
workerThread.setName(threadName);
} catch (SecurityException e) {
// Ignore.
}
}
}
workerStarted = true;
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
} finally {
if (connected && !workerStarted) {
OioDatagramWorker.close(channel, future);
}
}
}
}

View File

@ -0,0 +1,210 @@
/*
* JBoss, Home of Professional Open Source
*
* Copyright 2008, Red Hat Middleware LLC, and individual contributors
* by the @author tags. See the COPYRIGHT.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.channel.socket.oio;
import static org.jboss.netty.channel.Channels.*;
import java.net.DatagramPacket;
import java.net.MulticastSocket;
import java.net.SocketAddress;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ReceiveBufferSizePredictor;
/**
*
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Trustin Lee (tlee@redhat.com)
*
* @version $Rev$, $Date$
*
*/
class OioDatagramWorker implements Runnable {
private final OioDatagramChannel channel;
OioDatagramWorker(OioDatagramChannel channel) {
this.channel = channel;
}
public void run() {
channel.workerThread = Thread.currentThread();
final MulticastSocket socket = channel.socket;
while (channel.isOpen()) {
synchronized (this) {
while (!channel.isReadable()) {
try {
// notify() is not called at all.
// close() and setInterestOps() calls Thread.interrupt()
this.wait();
} catch (InterruptedException e) {
if (!channel.isOpen()) {
break;
}
}
}
}
ReceiveBufferSizePredictor predictor =
channel.getConfig().getReceiveBufferSizePredictor();
byte[] buf = new byte[predictor.nextReceiveBufferSize()];
DatagramPacket packet = new DatagramPacket(buf, buf.length);
try {
socket.receive(packet);
} catch (Throwable t) {
if (!channel.socket.isClosed()) {
fireExceptionCaught(channel, t);
}
break;
}
ChannelBuffer buffer;
int readBytes = packet.getLength();
if (readBytes == buf.length) {
buffer = ChannelBuffers.wrappedBuffer(buf);
} else {
buffer = ChannelBuffers.wrappedBuffer(buf, 0, readBytes);
}
fireMessageReceived(channel, buffer, packet.getSocketAddress());
}
// Setting the workerThread to null will prevent any channel
// operations from interrupting this thread from now on.
channel.workerThread = null;
// Clean up.
close(channel, succeededFuture(channel));
}
static void write(
OioDatagramChannel channel, ChannelFuture future,
Object message, SocketAddress remoteAddress) {
try {
// FIXME: Avoid extra copy.
ChannelBuffer a = (ChannelBuffer) message;
byte[] b = new byte[a.readableBytes()];
a.getBytes(0, b);
channel.socket.send(new DatagramPacket(b, b.length, remoteAddress));
fireWriteComplete(channel, b.length);
future.setSuccess();
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
}
}
static void setInterestOps(
OioDatagramChannel channel, ChannelFuture future, int interestOps) {
// Override OP_WRITE flag - a user cannot change this flag.
interestOps &= ~Channel.OP_WRITE;
interestOps |= channel.getInterestOps() & Channel.OP_WRITE;
boolean changed = false;
try {
if (channel.getInterestOps() != interestOps) {
if ((interestOps & Channel.OP_READ) != 0) {
channel.setInterestOpsNow(Channel.OP_READ);
} else {
channel.setInterestOpsNow(Channel.OP_NONE);
}
changed = true;
}
future.setSuccess();
if (changed) {
// Notify the worker so it stops or continues reading.
Thread currentThread = Thread.currentThread();
Thread workerThread = channel.workerThread;
if (workerThread != null && currentThread != workerThread) {
workerThread.interrupt();
}
channel.setInterestOpsNow(interestOps);
fireChannelInterestChanged(channel);
}
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
}
}
static void disconnect(OioDatagramChannel channel, ChannelFuture future) {
boolean connected = channel.isConnected();
try {
channel.socket.disconnect();
future.setSuccess();
if (connected) {
fireChannelDisconnected(channel);
}
Thread workerThread = channel.workerThread;
if (workerThread != null) {
try {
workerThread.setName(
"Old I/O datagram worker (channelId: " +
channel.getId() + ", " + channel.getLocalAddress() + ')');
} catch (SecurityException e) {
// Ignore.
}
}
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
}
}
static void close(OioDatagramChannel channel, ChannelFuture future) {
boolean connected = channel.isConnected();
boolean bound = channel.isBound();
try {
channel.socket.close();
future.setSuccess();
if (channel.setClosed()) {
if (connected) {
// Notify the worker so it stops reading.
Thread currentThread = Thread.currentThread();
Thread workerThread = channel.workerThread;
if (workerThread != null && currentThread != workerThread) {
workerThread.interrupt();
}
fireChannelDisconnected(channel);
}
if (bound) {
fireChannelUnbound(channel);
}
fireChannelClosed(channel);
}
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
}
}
}

View File

@ -0,0 +1,69 @@
/*
* JBoss, Home of Professional Open Source
*
* Copyright 2009, Red Hat Middleware LLC, and individual contributors
* by the @author tags. See the COPYRIGHT.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.example.qotm;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.DatagramChannel;
import org.jboss.netty.channel.socket.DatagramChannelFactory;
import org.jboss.netty.channel.socket.oio.OioDatagramChannelFactory;
import org.jboss.netty.handler.codec.string.StringDecoder;
import org.jboss.netty.handler.codec.string.StringEncoder;
/**
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Trustin Lee (tlee@redhat.com)
* @version $Rev$, $Date$
*/
public class QuoteOfTheMomentClient {
public static void main(String[] args) throws Exception {
DatagramChannelFactory f =
new OioDatagramChannelFactory(Executors.newCachedThreadPool());
ChannelPipeline p = Channels.pipeline();
p.addLast("encoder", new StringEncoder("UTF-8"));
p.addLast("decoder", new StringDecoder("UTF-8"));
p.addLast("handler", new QuoteOfTheMomentClientHandler());
DatagramChannel c = f.newChannel(p);
c.getConfig().setBroadcast(true);
c.bind(new InetSocketAddress(0)).awaitUninterruptibly();
// Broadcast the QOTM request to port 8080.
c.write("QOTM?", new InetSocketAddress("255.255.255.255", 8080));
// QuoteOfTheMomentClientHandler will close the DatagramChannel when a
// response is received. If the channel is not closed within 5 seconds,
// print an error message and quit.
if (!c.getCloseFuture().awaitUninterruptibly(5000)) {
System.err.println("QOTM request timed out.");
c.close().awaitUninterruptibly();
}
f.releaseExternalResources();
}
}

View File

@ -0,0 +1,55 @@
/*
* JBoss, Home of Professional Open Source
*
* Copyright 2009, Red Hat Middleware LLC, and individual contributors
* by the @author tags. See the COPYRIGHT.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.example.qotm;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
/**
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Trustin Lee (tlee@redhat.com)
* @version $Rev$, $Date$
*/
@ChannelPipelineCoverage("all")
public class QuoteOfTheMomentClientHandler extends SimpleChannelUpstreamHandler {
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
String msg = (String) e.getMessage();
if (msg.startsWith("QOTM: ")) {
System.out.println("Quote of the Moment: " + msg.substring(6));
e.getChannel().close();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
throws Exception {
e.getCause().printStackTrace();
e.getChannel().close();
}
}

View File

@ -0,0 +1,56 @@
/*
* JBoss, Home of Professional Open Source
*
* Copyright 2009, Red Hat Middleware LLC, and individual contributors
* by the @author tags. See the COPYRIGHT.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.example.qotm;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.DatagramChannel;
import org.jboss.netty.channel.socket.DatagramChannelFactory;
import org.jboss.netty.channel.socket.oio.OioDatagramChannelFactory;
import org.jboss.netty.handler.codec.string.StringDecoder;
import org.jboss.netty.handler.codec.string.StringEncoder;
/**
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Trustin Lee (tlee@redhat.com)
* @version $Rev$, $Date$
*/
public class QuoteOfTheMomentServer {
public static void main(String[] args) throws Exception {
DatagramChannelFactory f =
new OioDatagramChannelFactory(Executors.newCachedThreadPool());
ChannelPipeline p = Channels.pipeline();
p.addLast("encoder", new StringEncoder("UTF-8"));
p.addLast("decoder", new StringDecoder("UTF-8"));
p.addLast("handler", new QuoteOfTheMomentServerHandler());
DatagramChannel c = f.newChannel(p);
c.getConfig().setBroadcast(false);
c.bind(new InetSocketAddress(8080)).awaitUninterruptibly();
}
}

View File

@ -0,0 +1,74 @@
/*
* JBoss, Home of Professional Open Source
*
* Copyright 2009, Red Hat Middleware LLC, and individual contributors
* by the @author tags. See the COPYRIGHT.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.example.qotm;
import java.util.Random;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
/**
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Trustin Lee (tlee@redhat.com)
* @version $Rev$, $Date$
*/
@ChannelPipelineCoverage("all")
public class QuoteOfTheMomentServerHandler extends SimpleChannelUpstreamHandler {
private static final Random random = new Random();
// Quotes from Mohandas K. Gandhi:
private static final String[] quotes = new String[] {
"Where there is love there is life.",
"First they ignore you, then they laugh at you, then they fight you, then you win.",
"Be the change you want to see in the world.",
"The weak can never forgive. Forgiveness is the attribute of the strong.",
};
private static String nextQuote() {
int quoteId;
synchronized (random) {
quoteId = random.nextInt(quotes.length);
}
return quotes[quoteId];
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
String msg = (String) e.getMessage();
if (msg.equals("QOTM?")) {
e.getChannel().write("QOTM: " + nextQuote(), e.getRemoteAddress());
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
throws Exception {
e.getCause().printStackTrace();
// We don't close the channel because we can keep serving requests.
}
}