diff --git a/src/main/java/org/jboss/netty/channel/socket/oio/OioDatagramChannel.java b/src/main/java/org/jboss/netty/channel/socket/oio/OioDatagramChannel.java new file mode 100644 index 0000000000..a87bf93416 --- /dev/null +++ b/src/main/java/org/jboss/netty/channel/socket/oio/OioDatagramChannel.java @@ -0,0 +1,144 @@ +/* + * JBoss, Home of Professional Open Source + * + * Copyright 2008, Red Hat Middleware LLC, and individual contributors + * by the @author tags. See the COPYRIGHT.txt in the distribution for a + * full listing of individual contributors. + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ +package org.jboss.netty.channel.socket.oio; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.MulticastSocket; +import java.net.NetworkInterface; +import java.net.SocketAddress; + +import org.jboss.netty.channel.AbstractChannel; +import org.jboss.netty.channel.ChannelException; +import org.jboss.netty.channel.ChannelFactory; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.ChannelSink; +import org.jboss.netty.channel.socket.DatagramChannel; +import org.jboss.netty.channel.socket.DatagramChannelConfig; +import org.jboss.netty.channel.socket.DefaultDatagramChannelConfig; + +/** + * + * @author The Netty Project (netty-dev@lists.jboss.org) + * @author Trustin Lee (tlee@redhat.com) + * + * @version $Rev$, $Date$ + * + */ +final class OioDatagramChannel extends AbstractChannel + implements DatagramChannel { + + final MulticastSocket socket; + private final DatagramChannelConfig config; + volatile Thread workerThread; + + OioDatagramChannel( + ChannelFactory factory, + ChannelPipeline pipeline, + ChannelSink sink) { + + super(null, factory, pipeline, sink); + + try { + socket = new MulticastSocket(null); + } catch (IOException e) { + throw new ChannelException("Failed to open a datagram socket.", e); + } + config = new DefaultDatagramChannelConfig(socket); + } + + public DatagramChannelConfig getConfig() { + return config; + } + + public InetSocketAddress getLocalAddress() { + return (InetSocketAddress) socket.getLocalSocketAddress(); + } + + public InetSocketAddress getRemoteAddress() { + return (InetSocketAddress) socket.getRemoteSocketAddress(); + } + + public boolean isBound() { + return isOpen() && socket.isBound(); + } + + public boolean isConnected() { + return isOpen() && socket.isConnected(); + } + + @Override + protected boolean setClosed() { + return super.setClosed(); + } + + @Override + protected void setInterestOpsNow(int interestOps) { + super.setInterestOpsNow(interestOps); + } + + @Override + public ChannelFuture write(Object message, SocketAddress remoteAddress) { + if (remoteAddress == null || remoteAddress.equals(getRemoteAddress())) { + return super.write(message, null); + } else { + return super.write(message, remoteAddress); + } + } + + public void joinGroup(InetAddress multicastAddress) { + try { + socket.joinGroup(multicastAddress); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + public void joinGroup( + InetSocketAddress multicastAddress, NetworkInterface networkInterface) { + try { + socket.joinGroup(multicastAddress, networkInterface); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + public void leaveGroup(InetAddress multicastAddress) { + try { + socket.leaveGroup(multicastAddress); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + public void leaveGroup( + InetSocketAddress multicastAddress, NetworkInterface networkInterface) { + try { + socket.leaveGroup(multicastAddress, networkInterface); + } catch (IOException e) { + throw new ChannelException(e); + } + } +} diff --git a/src/main/java/org/jboss/netty/channel/socket/oio/OioDatagramChannelFactory.java b/src/main/java/org/jboss/netty/channel/socket/oio/OioDatagramChannelFactory.java new file mode 100644 index 0000000000..19d16d425e --- /dev/null +++ b/src/main/java/org/jboss/netty/channel/socket/oio/OioDatagramChannelFactory.java @@ -0,0 +1,110 @@ +/* + * JBoss, Home of Professional Open Source + * + * Copyright 2008, Red Hat Middleware LLC, and individual contributors + * by the @author tags. See the COPYRIGHT.txt in the distribution for a + * full listing of individual contributors. + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ +package org.jboss.netty.channel.socket.oio; + +import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionException; + +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.socket.DatagramChannel; +import org.jboss.netty.channel.socket.DatagramChannelFactory; +import org.jboss.netty.util.ExecutorUtil; + +/** + * A {@link DatagramChannelFactory} which creates a client-side blocking + * I/O based {@link DatagramChannel}. It utilizes the good old blocking I/O API + * which is known to yield better throughput and latency when there are + * relatively small number of connections to serve. + * + *

How threads work

+ *

+ * There is only one type of threads in {@link OioDatagramChannelFactory}; + * worker threads. + * + *

Worker threads

+ *

+ * Each {@link Channel} has a dedicated worker thread, just like a + * traditional blocking I/O thread model. + * + *

Life cycle of threads and graceful shutdown

+ *

+ * Worker threads are acquired from the {@link Executor} which was specified + * when a {@link OioDatagramChannelFactory} was created (i.e. {@code workerExecutor}.) + * Therefore, you should make sure the specified {@link Executor} is able to + * lend the sufficient number of threads. + *

+ * Worker threads are acquired lazily, and then released when there's nothing + * left to process. All the related resources are also released when the + * worker threads are released. Therefore, to shut down a service gracefully, + * you should do the following: + * + *

    + *
  1. close all channels created by the factory, and
  2. + *
  3. call {@link #releaseExternalResources()}.
  4. + *
+ * + * Please make sure not to shut down the executor until all channels are + * closed. Otherwise, you will end up with a {@link RejectedExecutionException} + * and the related resources might not be released properly. + * + *

Limitation

+ *

+ * A {@link DatagramChannel} created by this factory does not support asynchronous + * operations. Any I/O requests such as {@code "write"} will be performed in a + * blocking manner. + * + * @author The Netty Project (netty-dev@lists.jboss.org) + * @author Trustin Lee (tlee@redhat.com) + * + * @version $Rev$, $Date$ + * + * @apiviz.landmark + */ +public class OioDatagramChannelFactory implements DatagramChannelFactory { + + private final Executor workerExecutor; + final OioDatagramPipelineSink sink; + + /** + * Creates a new instance. + * + * @param workerExecutor + * the {@link Executor} which will execute the I/O worker threads + */ + public OioDatagramChannelFactory(Executor workerExecutor) { + if (workerExecutor == null) { + throw new NullPointerException("workerExecutor"); + } + this.workerExecutor = workerExecutor; + sink = new OioDatagramPipelineSink(workerExecutor); + } + + public DatagramChannel newChannel(ChannelPipeline pipeline) { + return new OioDatagramChannel(this, pipeline, sink); + } + + public void releaseExternalResources() { + ExecutorUtil.terminate(workerExecutor); + } +} diff --git a/src/main/java/org/jboss/netty/channel/socket/oio/OioDatagramPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/oio/OioDatagramPipelineSink.java new file mode 100644 index 0000000000..be8950ccb6 --- /dev/null +++ b/src/main/java/org/jboss/netty/channel/socket/oio/OioDatagramPipelineSink.java @@ -0,0 +1,176 @@ +/* + * JBoss, Home of Professional Open Source + * + * Copyright 2008, Red Hat Middleware LLC, and individual contributors + * by the @author tags. See the COPYRIGHT.txt in the distribution for a + * full listing of individual contributors. + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ +package org.jboss.netty.channel.socket.oio; + +import static org.jboss.netty.channel.Channels.*; + +import java.net.SocketAddress; +import java.util.concurrent.Executor; + +import org.jboss.netty.channel.AbstractChannelSink; +import org.jboss.netty.channel.ChannelEvent; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelFutureListener; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.ChannelState; +import org.jboss.netty.channel.ChannelStateEvent; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.util.ThreadRenamingRunnable; + +/** + * + * @author The Netty Project (netty-dev@lists.jboss.org) + * @author Trustin Lee (tlee@redhat.com) + * + * @version $Rev$, $Date$ + * + */ +class OioDatagramPipelineSink extends AbstractChannelSink { + + private final Executor workerExecutor; + + OioDatagramPipelineSink(Executor workerExecutor) { + this.workerExecutor = workerExecutor; + } + + public void eventSunk( + ChannelPipeline pipeline, ChannelEvent e) throws Exception { + OioDatagramChannel channel = (OioDatagramChannel) e.getChannel(); + ChannelFuture future = e.getFuture(); + if (e instanceof ChannelStateEvent) { + ChannelStateEvent stateEvent = (ChannelStateEvent) e; + ChannelState state = stateEvent.getState(); + Object value = stateEvent.getValue(); + switch (state) { + case OPEN: + if (Boolean.FALSE.equals(value)) { + OioDatagramWorker.close(channel, future); + } + break; + case BOUND: + if (value != null) { + bind(channel, future, (SocketAddress) value); + } else { + OioDatagramWorker.close(channel, future); + } + break; + case CONNECTED: + if (value != null) { + connect(channel, future, (SocketAddress) value); + } else { + OioDatagramWorker.disconnect(channel, future); + } + break; + case INTEREST_OPS: + OioDatagramWorker.setInterestOps(channel, future, ((Integer) value).intValue()); + break; + } + } else if (e instanceof MessageEvent) { + MessageEvent evt = (MessageEvent) e; + OioDatagramWorker.write( + channel, future, evt.getMessage(), evt.getRemoteAddress()); + } + } + + private void bind( + OioDatagramChannel channel, ChannelFuture future, + SocketAddress localAddress) { + boolean bound = false; + boolean workerStarted = false; + try { + channel.socket.bind(localAddress); + bound = true; + + // Fire events + future.setSuccess(); + fireChannelBound(channel, channel.getLocalAddress()); + + // Start the business. + workerExecutor.execute(new ThreadRenamingRunnable( + new OioDatagramWorker(channel), + "Old I/O datagram worker (channelId: " + channel.getId() + ", " + + channel.getLocalAddress() + ')')); + + workerStarted = true; + } catch (Throwable t) { + future.setFailure(t); + fireExceptionCaught(channel, t); + } finally { + if (bound && !workerStarted) { + OioDatagramWorker.close(channel, future); + } + } + } + + private void connect( + OioDatagramChannel channel, ChannelFuture future, + SocketAddress remoteAddress) { + + boolean bound = channel.isBound(); + boolean connected = false; + boolean workerStarted = false; + + future.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); + + try { + channel.socket.connect(remoteAddress); + connected = true; + + // Fire events. + future.setSuccess(); + if (!bound) { + fireChannelBound(channel, channel.getLocalAddress()); + } + fireChannelConnected(channel, channel.getRemoteAddress()); + + String threadName = + "Old I/O datagram worker (channelId: " + channel.getId() + ", " + + channel.getLocalAddress() + " => " + + channel.getRemoteAddress() + ')'; + if (!bound) { + // Start the business. + workerExecutor.execute(new ThreadRenamingRunnable( + new OioDatagramWorker(channel), threadName)); + } else { + // Worker started by bind() - just rename. + Thread workerThread = channel.workerThread; + if (workerThread != null) { + try { + workerThread.setName(threadName); + } catch (SecurityException e) { + // Ignore. + } + } + } + + workerStarted = true; + } catch (Throwable t) { + future.setFailure(t); + fireExceptionCaught(channel, t); + } finally { + if (connected && !workerStarted) { + OioDatagramWorker.close(channel, future); + } + } + } +} diff --git a/src/main/java/org/jboss/netty/channel/socket/oio/OioDatagramWorker.java b/src/main/java/org/jboss/netty/channel/socket/oio/OioDatagramWorker.java new file mode 100644 index 0000000000..c52c63c568 --- /dev/null +++ b/src/main/java/org/jboss/netty/channel/socket/oio/OioDatagramWorker.java @@ -0,0 +1,210 @@ +/* + * JBoss, Home of Professional Open Source + * + * Copyright 2008, Red Hat Middleware LLC, and individual contributors + * by the @author tags. See the COPYRIGHT.txt in the distribution for a + * full listing of individual contributors. + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ +package org.jboss.netty.channel.socket.oio; + +import static org.jboss.netty.channel.Channels.*; + +import java.net.DatagramPacket; +import java.net.MulticastSocket; +import java.net.SocketAddress; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ReceiveBufferSizePredictor; + +/** + * + * @author The Netty Project (netty-dev@lists.jboss.org) + * @author Trustin Lee (tlee@redhat.com) + * + * @version $Rev$, $Date$ + * + */ +class OioDatagramWorker implements Runnable { + + private final OioDatagramChannel channel; + + OioDatagramWorker(OioDatagramChannel channel) { + this.channel = channel; + } + + public void run() { + channel.workerThread = Thread.currentThread(); + final MulticastSocket socket = channel.socket; + + while (channel.isOpen()) { + synchronized (this) { + while (!channel.isReadable()) { + try { + // notify() is not called at all. + // close() and setInterestOps() calls Thread.interrupt() + this.wait(); + } catch (InterruptedException e) { + if (!channel.isOpen()) { + break; + } + } + } + } + + ReceiveBufferSizePredictor predictor = + channel.getConfig().getReceiveBufferSizePredictor(); + + byte[] buf = new byte[predictor.nextReceiveBufferSize()]; + DatagramPacket packet = new DatagramPacket(buf, buf.length); + try { + socket.receive(packet); + } catch (Throwable t) { + if (!channel.socket.isClosed()) { + fireExceptionCaught(channel, t); + } + break; + } + + ChannelBuffer buffer; + int readBytes = packet.getLength(); + if (readBytes == buf.length) { + buffer = ChannelBuffers.wrappedBuffer(buf); + } else { + buffer = ChannelBuffers.wrappedBuffer(buf, 0, readBytes); + } + + fireMessageReceived(channel, buffer, packet.getSocketAddress()); + } + + // Setting the workerThread to null will prevent any channel + // operations from interrupting this thread from now on. + channel.workerThread = null; + + // Clean up. + close(channel, succeededFuture(channel)); + } + + static void write( + OioDatagramChannel channel, ChannelFuture future, + Object message, SocketAddress remoteAddress) { + try { + // FIXME: Avoid extra copy. + ChannelBuffer a = (ChannelBuffer) message; + byte[] b = new byte[a.readableBytes()]; + a.getBytes(0, b); + channel.socket.send(new DatagramPacket(b, b.length, remoteAddress)); + fireWriteComplete(channel, b.length); + future.setSuccess(); + } catch (Throwable t) { + future.setFailure(t); + fireExceptionCaught(channel, t); + } + } + + static void setInterestOps( + OioDatagramChannel channel, ChannelFuture future, int interestOps) { + + // Override OP_WRITE flag - a user cannot change this flag. + interestOps &= ~Channel.OP_WRITE; + interestOps |= channel.getInterestOps() & Channel.OP_WRITE; + + boolean changed = false; + try { + if (channel.getInterestOps() != interestOps) { + if ((interestOps & Channel.OP_READ) != 0) { + channel.setInterestOpsNow(Channel.OP_READ); + } else { + channel.setInterestOpsNow(Channel.OP_NONE); + } + changed = true; + } + + future.setSuccess(); + if (changed) { + // Notify the worker so it stops or continues reading. + Thread currentThread = Thread.currentThread(); + Thread workerThread = channel.workerThread; + if (workerThread != null && currentThread != workerThread) { + workerThread.interrupt(); + } + + channel.setInterestOpsNow(interestOps); + fireChannelInterestChanged(channel); + } + } catch (Throwable t) { + future.setFailure(t); + fireExceptionCaught(channel, t); + } + } + + static void disconnect(OioDatagramChannel channel, ChannelFuture future) { + boolean connected = channel.isConnected(); + try { + channel.socket.disconnect(); + future.setSuccess(); + if (connected) { + fireChannelDisconnected(channel); + } + + Thread workerThread = channel.workerThread; + if (workerThread != null) { + try { + workerThread.setName( + "Old I/O datagram worker (channelId: " + + channel.getId() + ", " + channel.getLocalAddress() + ')'); + } catch (SecurityException e) { + // Ignore. + } + } + + } catch (Throwable t) { + future.setFailure(t); + fireExceptionCaught(channel, t); + } + } + + static void close(OioDatagramChannel channel, ChannelFuture future) { + boolean connected = channel.isConnected(); + boolean bound = channel.isBound(); + try { + channel.socket.close(); + future.setSuccess(); + if (channel.setClosed()) { + if (connected) { + // Notify the worker so it stops reading. + Thread currentThread = Thread.currentThread(); + Thread workerThread = channel.workerThread; + if (workerThread != null && currentThread != workerThread) { + workerThread.interrupt(); + } + fireChannelDisconnected(channel); + } + if (bound) { + fireChannelUnbound(channel); + } + fireChannelClosed(channel); + } + } catch (Throwable t) { + future.setFailure(t); + fireExceptionCaught(channel, t); + } + } +} \ No newline at end of file diff --git a/src/main/java/org/jboss/netty/example/qotm/QuoteOfTheMomentClient.java b/src/main/java/org/jboss/netty/example/qotm/QuoteOfTheMomentClient.java new file mode 100644 index 0000000000..ac32d247c5 --- /dev/null +++ b/src/main/java/org/jboss/netty/example/qotm/QuoteOfTheMomentClient.java @@ -0,0 +1,69 @@ +/* + * JBoss, Home of Professional Open Source + * + * Copyright 2009, Red Hat Middleware LLC, and individual contributors + * by the @author tags. See the COPYRIGHT.txt in the distribution for a + * full listing of individual contributors. + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ +package org.jboss.netty.example.qotm; + +import java.net.InetSocketAddress; +import java.util.concurrent.Executors; + +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.Channels; +import org.jboss.netty.channel.socket.DatagramChannel; +import org.jboss.netty.channel.socket.DatagramChannelFactory; +import org.jboss.netty.channel.socket.oio.OioDatagramChannelFactory; +import org.jboss.netty.handler.codec.string.StringDecoder; +import org.jboss.netty.handler.codec.string.StringEncoder; + +/** + * @author The Netty Project (netty-dev@lists.jboss.org) + * @author Trustin Lee (tlee@redhat.com) + * @version $Rev$, $Date$ + */ +public class QuoteOfTheMomentClient { + + public static void main(String[] args) throws Exception { + DatagramChannelFactory f = + new OioDatagramChannelFactory(Executors.newCachedThreadPool()); + + ChannelPipeline p = Channels.pipeline(); + p.addLast("encoder", new StringEncoder("UTF-8")); + p.addLast("decoder", new StringDecoder("UTF-8")); + p.addLast("handler", new QuoteOfTheMomentClientHandler()); + + DatagramChannel c = f.newChannel(p); + c.getConfig().setBroadcast(true); + c.bind(new InetSocketAddress(0)).awaitUninterruptibly(); + + // Broadcast the QOTM request to port 8080. + c.write("QOTM?", new InetSocketAddress("255.255.255.255", 8080)); + + // QuoteOfTheMomentClientHandler will close the DatagramChannel when a + // response is received. If the channel is not closed within 5 seconds, + // print an error message and quit. + if (!c.getCloseFuture().awaitUninterruptibly(5000)) { + System.err.println("QOTM request timed out."); + c.close().awaitUninterruptibly(); + } + + f.releaseExternalResources(); + } +} diff --git a/src/main/java/org/jboss/netty/example/qotm/QuoteOfTheMomentClientHandler.java b/src/main/java/org/jboss/netty/example/qotm/QuoteOfTheMomentClientHandler.java new file mode 100644 index 0000000000..f9ae6f595d --- /dev/null +++ b/src/main/java/org/jboss/netty/example/qotm/QuoteOfTheMomentClientHandler.java @@ -0,0 +1,55 @@ +/* + * JBoss, Home of Professional Open Source + * + * Copyright 2009, Red Hat Middleware LLC, and individual contributors + * by the @author tags. See the COPYRIGHT.txt in the distribution for a + * full listing of individual contributors. + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ +package org.jboss.netty.example.qotm; + +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ChannelPipelineCoverage; +import org.jboss.netty.channel.ExceptionEvent; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.SimpleChannelUpstreamHandler; + +/** + * @author The Netty Project (netty-dev@lists.jboss.org) + * @author Trustin Lee (tlee@redhat.com) + * @version $Rev$, $Date$ + */ +@ChannelPipelineCoverage("all") +public class QuoteOfTheMomentClientHandler extends SimpleChannelUpstreamHandler { + + @Override + public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) + throws Exception { + String msg = (String) e.getMessage(); + if (msg.startsWith("QOTM: ")) { + System.out.println("Quote of the Moment: " + msg.substring(6)); + e.getChannel().close(); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) + throws Exception { + e.getCause().printStackTrace(); + e.getChannel().close(); + } +} diff --git a/src/main/java/org/jboss/netty/example/qotm/QuoteOfTheMomentServer.java b/src/main/java/org/jboss/netty/example/qotm/QuoteOfTheMomentServer.java new file mode 100644 index 0000000000..122e825334 --- /dev/null +++ b/src/main/java/org/jboss/netty/example/qotm/QuoteOfTheMomentServer.java @@ -0,0 +1,56 @@ +/* + * JBoss, Home of Professional Open Source + * + * Copyright 2009, Red Hat Middleware LLC, and individual contributors + * by the @author tags. See the COPYRIGHT.txt in the distribution for a + * full listing of individual contributors. + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ +package org.jboss.netty.example.qotm; + +import java.net.InetSocketAddress; +import java.util.concurrent.Executors; + +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.Channels; +import org.jboss.netty.channel.socket.DatagramChannel; +import org.jboss.netty.channel.socket.DatagramChannelFactory; +import org.jboss.netty.channel.socket.oio.OioDatagramChannelFactory; +import org.jboss.netty.handler.codec.string.StringDecoder; +import org.jboss.netty.handler.codec.string.StringEncoder; + +/** + * @author The Netty Project (netty-dev@lists.jboss.org) + * @author Trustin Lee (tlee@redhat.com) + * @version $Rev$, $Date$ + */ +public class QuoteOfTheMomentServer { + + public static void main(String[] args) throws Exception { + DatagramChannelFactory f = + new OioDatagramChannelFactory(Executors.newCachedThreadPool()); + + ChannelPipeline p = Channels.pipeline(); + p.addLast("encoder", new StringEncoder("UTF-8")); + p.addLast("decoder", new StringDecoder("UTF-8")); + p.addLast("handler", new QuoteOfTheMomentServerHandler()); + + DatagramChannel c = f.newChannel(p); + c.getConfig().setBroadcast(false); + c.bind(new InetSocketAddress(8080)).awaitUninterruptibly(); + } +} diff --git a/src/main/java/org/jboss/netty/example/qotm/QuoteOfTheMomentServerHandler.java b/src/main/java/org/jboss/netty/example/qotm/QuoteOfTheMomentServerHandler.java new file mode 100644 index 0000000000..c7b5ab3d7f --- /dev/null +++ b/src/main/java/org/jboss/netty/example/qotm/QuoteOfTheMomentServerHandler.java @@ -0,0 +1,74 @@ +/* + * JBoss, Home of Professional Open Source + * + * Copyright 2009, Red Hat Middleware LLC, and individual contributors + * by the @author tags. See the COPYRIGHT.txt in the distribution for a + * full listing of individual contributors. + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ +package org.jboss.netty.example.qotm; + +import java.util.Random; + +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ChannelPipelineCoverage; +import org.jboss.netty.channel.ExceptionEvent; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.SimpleChannelUpstreamHandler; + +/** + * @author The Netty Project (netty-dev@lists.jboss.org) + * @author Trustin Lee (tlee@redhat.com) + * @version $Rev$, $Date$ + */ +@ChannelPipelineCoverage("all") +public class QuoteOfTheMomentServerHandler extends SimpleChannelUpstreamHandler { + + private static final Random random = new Random(); + + // Quotes from Mohandas K. Gandhi: + private static final String[] quotes = new String[] { + "Where there is love there is life.", + "First they ignore you, then they laugh at you, then they fight you, then you win.", + "Be the change you want to see in the world.", + "The weak can never forgive. Forgiveness is the attribute of the strong.", + }; + + private static String nextQuote() { + int quoteId; + synchronized (random) { + quoteId = random.nextInt(quotes.length); + } + return quotes[quoteId]; + } + + @Override + public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) + throws Exception { + String msg = (String) e.getMessage(); + if (msg.equals("QOTM?")) { + e.getChannel().write("QOTM: " + nextQuote(), e.getRemoteAddress()); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) + throws Exception { + e.getCause().printStackTrace(); + // We don't close the channel because we can keep serving requests. + } +}