From 899b16678f4a86354736c444b9c214b8ae055a4b Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Thu, 11 Jun 2009 06:10:46 +0000 Subject: [PATCH] * Merged recent changes in the trunk to the NIO UDP transport * Other miscellaneous modifications like typo fix --- .../socket/nio/NioDatagramChannel.java | 156 ++++++++++-------- .../socket/nio/NioDatagramChannelFactory.java | 11 +- .../socket/nio/NioDatagramPipelineSink.java | 15 +- .../channel/socket/nio/NioUdpWorker.java | 156 ++++++++++-------- .../socket/nio/NioDatagramChannelTest.java | 9 +- 5 files changed, 199 insertions(+), 148 deletions(-) diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannel.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannel.java index a52e47c043..4d7c3e42ee 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannel.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannel.java @@ -25,7 +25,9 @@ package org.jboss.netty.channel.socket.nio; import static org.jboss.netty.channel.Channels.*; import java.io.IOException; +import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.NetworkInterface; import java.net.SocketAddress; import java.nio.channels.DatagramChannel; import java.util.Queue; @@ -34,16 +36,14 @@ import java.util.concurrent.atomic.AtomicInteger; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.AbstractChannel; +import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelException; import org.jboss.netty.channel.ChannelFactory; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelSink; import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.channel.ServerChannel; import org.jboss.netty.channel.socket.DatagramChannelConfig; -import org.jboss.netty.logging.InternalLogger; -import org.jboss.netty.logging.InternalLoggerFactory; import org.jboss.netty.util.internal.LinkedTransferQueue; import org.jboss.netty.util.internal.ThreadLocalBoolean; @@ -56,13 +56,8 @@ import org.jboss.netty.util.internal.ThreadLocalBoolean; * * @version $Rev$, $Date$ */ -public class NioDatagramChannel extends AbstractChannel implements - ServerChannel { - /** - * Internal Netty logger. - */ - private static final InternalLogger logger = InternalLoggerFactory - .getInstance(NioDatagramChannel.class); +class NioDatagramChannel extends AbstractChannel + implements org.jboss.netty.channel.socket.DatagramChannel { /** * The {@link DatagramChannelConfig}. @@ -77,7 +72,7 @@ public class NioDatagramChannel extends AbstractChannel implements /** * The {@link DatagramChannel} that this channel uses. */ - private final DatagramChannel datagramChannel; + private final java.nio.channels.DatagramChannel datagramChannel; /** * Monitor object to synchronize access to InterestedOps. @@ -120,30 +115,17 @@ public class NioDatagramChannel extends AbstractChannel implements */ MessageEvent currentWriteEvent; - /** - * The current write index. - */ - int currentWriteIndex; - /** * Boolean that indicates that write operation is in progress. */ volatile boolean inWriteNowLoop; - /** - * - * @param factory - * @param pipeline - * @param sink - * @param worker - */ - public NioDatagramChannel(final ChannelFactory factory, + NioDatagramChannel(final ChannelFactory factory, final ChannelPipeline pipeline, final ChannelSink sink, final NioUdpWorker worker) { super(null, factory, pipeline, sink); this.worker = worker; datagramChannel = openNonBlockingChannel(); - setSoTimeout(1000); config = new DefaultNioDatagramChannelConfig(datagramChannel.socket()); fireChannelOpen(this); @@ -159,27 +141,22 @@ public class NioDatagramChannel extends AbstractChannel implements } } - private void setSoTimeout(final int timeout) { + public InetSocketAddress getLocalAddress() { try { - datagramChannel.socket().setSoTimeout(timeout); - } catch (final IOException e) { - try { - datagramChannel.close(); - } catch (final IOException e2) { - logger.warn("Failed to close a partially DatagramSocket.", e2); - } - throw new ChannelException( - "Failed to set the DatagramSocket timeout.", e); + return (InetSocketAddress) datagramChannel.socket().getLocalSocketAddress(); + } catch (Throwable t) { + // Sometimes fails on a closed socket in Windows. + return null; } } - public InetSocketAddress getLocalAddress() { - return (InetSocketAddress) datagramChannel.socket() - .getLocalSocketAddress(); - } - - public SocketAddress getRemoteAddress() { - return datagramChannel.socket().getRemoteSocketAddress(); + public InetSocketAddress getRemoteAddress() { + try { + return (InetSocketAddress) datagramChannel.socket().getRemoteSocketAddress(); + } catch (Throwable t) { + // Sometimes fails on a closed socket in Windows. + return null; + } } public boolean isBound() { @@ -187,7 +164,7 @@ public class NioDatagramChannel extends AbstractChannel implements } public boolean isConnected() { - return datagramChannel.socket().isBound(); + return datagramChannel.socket().isConnected(); } @Override @@ -195,11 +172,6 @@ public class NioDatagramChannel extends AbstractChannel implements return super.setClosed(); } - @Override - protected ChannelFuture getSucceededFuture() { - return super.getSucceededFuture(); - } - public NioDatagramChannelConfig getConfig() { return config; } @@ -208,6 +180,37 @@ public class NioDatagramChannel extends AbstractChannel implements return datagramChannel; } + @Override + public int getInterestOps() { + if (!isOpen()) { + return Channel.OP_WRITE; + } + + int interestOps = getRawInterestOps(); + int writeBufferSize = this.writeBufferSize.get(); + if (writeBufferSize != 0) { + if (highWaterMarkCounter.get() > 0) { + int lowWaterMark = getConfig().getWriteBufferLowWaterMark(); + if (writeBufferSize >= lowWaterMark) { + interestOps |= Channel.OP_WRITE; + } else { + interestOps &= ~Channel.OP_WRITE; + } + } else { + int highWaterMark = getConfig().getWriteBufferHighWaterMark(); + if (writeBufferSize >= highWaterMark) { + interestOps |= Channel.OP_WRITE; + } else { + interestOps &= ~Channel.OP_WRITE; + } + } + } else { + interestOps &= ~Channel.OP_WRITE; + } + + return interestOps; + } + int getRawInterestOps() { return super.getInterestOps(); } @@ -216,6 +219,15 @@ public class NioDatagramChannel extends AbstractChannel implements super.setInterestOpsNow(interestOps); } + @Override + public ChannelFuture write(Object message, SocketAddress remoteAddress) { + if (remoteAddress == null || remoteAddress.equals(getRemoteAddress())) { + return super.write(message, null); + } else { + return super.write(message, remoteAddress); + } + } + /** * WriteBuffer is an extension of {@link LinkedTransferQueue} that adds * support for highWaterMark checking of the write buffer size. @@ -233,21 +245,17 @@ public class NioDatagramChannel extends AbstractChannel implements * adds support for keeping track of the size of the this write buffer. */ @Override - public boolean offer(final MessageEvent e) { - final boolean success = super.offer(e); + public boolean offer(MessageEvent e) { + boolean success = super.offer(e); assert success; - final int messageSize = ((ChannelBuffer) e.getMessage()) - .readableBytes(); + int messageSize = ((ChannelBuffer) e.getMessage()).readableBytes(); + int newWriteBufferSize = writeBufferSize.addAndGet(messageSize); + int highWaterMark = getConfig().getWriteBufferHighWaterMark(); - final int newWriteBufferSize = writeBufferSize - .addAndGet(messageSize); - - final int highWaterMark = getConfig().getWriteBufferHighWaterMark(); if (newWriteBufferSize >= highWaterMark) { if (newWriteBufferSize - messageSize < highWaterMark) { highWaterMarkCounter.incrementAndGet(); - if (!notifying.get()) { notifying.set(Boolean.TRUE); fireChannelInterestChanged(NioDatagramChannel.this); @@ -255,7 +263,6 @@ public class NioDatagramChannel extends AbstractChannel implements } } } - return true; } @@ -265,18 +272,13 @@ public class NioDatagramChannel extends AbstractChannel implements */ @Override public MessageEvent poll() { - final MessageEvent e = super.poll(); + MessageEvent e = super.poll(); if (e != null) { - final int messageSize = ((ChannelBuffer) e.getMessage()) - .readableBytes(); - final int newWriteBufferSize = writeBufferSize - .addAndGet(-messageSize); + int messageSize = ((ChannelBuffer) e.getMessage()).readableBytes(); + int newWriteBufferSize = writeBufferSize.addAndGet(-messageSize); + int lowWaterMark = getConfig().getWriteBufferLowWaterMark(); - final int lowWaterMark = getConfig() - .getWriteBufferLowWaterMark(); - - if (newWriteBufferSize == 0 || - newWriteBufferSize < lowWaterMark) { + if (newWriteBufferSize == 0 || newWriteBufferSize < lowWaterMark) { if (newWriteBufferSize + messageSize >= lowWaterMark) { highWaterMarkCounter.decrementAndGet(); if (!notifying.get()) { @@ -305,4 +307,22 @@ public class NioDatagramChannel extends AbstractChannel implements NioUdpWorker.write(NioDatagramChannel.this, false); } } + + public void joinGroup(InetAddress multicastAddress) { + throw new UnsupportedOperationException(); + } + + public void joinGroup(InetSocketAddress multicastAddress, + NetworkInterface networkInterface) { + throw new UnsupportedOperationException(); + } + + public void leaveGroup(InetAddress multicastAddress) { + throw new UnsupportedOperationException(); + } + + public void leaveGroup(InetSocketAddress multicastAddress, + NetworkInterface networkInterface) { + throw new UnsupportedOperationException(); + } } diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannelFactory.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannelFactory.java index 12690e0582..75eefc1fd2 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannelFactory.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannelFactory.java @@ -25,14 +25,14 @@ package org.jboss.netty.channel.socket.nio; import java.util.concurrent.Executor; import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelFactory; import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.ServerChannelFactory; +import org.jboss.netty.channel.socket.DatagramChannel; +import org.jboss.netty.channel.socket.DatagramChannelFactory; import org.jboss.netty.util.internal.ExecutorUtil; /** * A {@link NioDatagramChannelFactory} creates a server-side NIO-based - * {@link NioDatagramChannel}. It utilizes the non-blocking I/O mode which + * {@link DatagramChannel}. It utilizes the non-blocking I/O mode which * was introduced with NIO to serve many number of concurrent connections * efficiently. * @@ -59,8 +59,7 @@ import org.jboss.netty.util.internal.ExecutorUtil; * * @version $Rev$, $Date$ */ -public class NioDatagramChannelFactory implements ChannelFactory, - ServerChannelFactory { +public class NioDatagramChannelFactory implements DatagramChannelFactory { /** * */ @@ -102,7 +101,7 @@ public class NioDatagramChannelFactory implements ChannelFactory, sink = new NioDatagramPipelineSink(workerExecutor, workerCount); } - public NioDatagramChannel newChannel(final ChannelPipeline pipeline) { + public DatagramChannel newChannel(final ChannelPipeline pipeline) { return new NioDatagramChannel(this, pipeline, sink, sink.nextWorker()); } diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramPipelineSink.java index a94afcd5e0..2eef16ac08 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramPipelineSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramPipelineSink.java @@ -47,7 +47,7 @@ import org.jboss.netty.channel.MessageEvent; * * @version $Rev$, $Date$ */ -public class NioDatagramPipelineSink extends AbstractChannelSink { +class NioDatagramPipelineSink extends AbstractChannelSink { private static final AtomicInteger nextId = new AtomicInteger(); @@ -74,8 +74,9 @@ public class NioDatagramPipelineSink extends AbstractChannelSink { /** * Handle downstream event. * - * @param pipeline The channelpiple line that passed down the downstream event. - * @param event The downstream event. + * @param pipeline the {@link ChannelPipeline} that passes down the + * downstream event. + * @param e The downstream event. */ public void eventSunk(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception { @@ -98,6 +99,14 @@ public class NioDatagramPipelineSink extends AbstractChannelSink { NioUdpWorker.close(channel, future); } break; + case CONNECTED: + // TODO Implement me + if (value != null) { + //connect(channel, future, (SocketAddress) value); + } else { + //NioUdpWorker.disconnect(channel, future); + } + break; case INTEREST_OPS: NioUdpWorker.setInterestOps(channel, future, ((Integer) value) .intValue()); diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioUdpWorker.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioUdpWorker.java index bf44bb808b..5558facd6c 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioUdpWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioUdpWorker.java @@ -49,6 +49,7 @@ import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelException; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.ReceiveBufferSizePredictor; import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLoggerFactory; import org.jboss.netty.util.ThreadRenamingRunnable; @@ -71,12 +72,6 @@ class NioUdpWorker implements Runnable { private static final InternalLogger logger = InternalLoggerFactory .getInstance(NioUdpWorker.class); - /** - * Maximum packate size for UDP packets. - * 65,536-byte maximum size of an IP datagram minus the 20-byte size of the IP header and the 8-byte size of the UDP header. - */ - private static int MAX_PACKET_SIZE = 65507; - /** * This id of this worker. */ @@ -321,17 +316,21 @@ class NioUdpWorker implements Runnable { private static void processSelectedKeys(final Set selectedKeys) { for (Iterator i = selectedKeys.iterator(); i.hasNext();) { - final SelectionKey key = i.next(); + SelectionKey k = i.next(); i.remove(); try { - if (key.isReadable()) { - read(key); + int readyOps = k.readyOps(); + if ((readyOps & SelectionKey.OP_READ) != 0) { + if (!read(k)) { + // Connection already closed - no need to handle write. + continue; + } } - if (key.isWritable()) { - write(key); + if ((readyOps & SelectionKey.OP_WRITE) != 0) { + write(k); } - } catch (final CancelledKeyException ignore) { - close(key); + } catch (CancelledKeyException e) { + close(k); } } } @@ -347,40 +346,57 @@ class NioUdpWorker implements Runnable { * * @param key The selection key which contains the Selector registration information. */ - private static void read(final SelectionKey key) { - final NioDatagramChannel nioDatagramChannel = (NioDatagramChannel) key - .attachment(); + private static boolean read(final SelectionKey key) { + final NioDatagramChannel channel = (NioDatagramChannel) key.attachment(); + ReceiveBufferSizePredictor predictor = + channel.getConfig().getReceiveBufferSizePredictor(); - final DatagramChannel datagramChannel = (DatagramChannel) key.channel(); + final DatagramChannel nioChannel = (DatagramChannel) key.channel(); + + // Allocating a non-direct buffer with a max udp packge size. + // Would using a direct buffer be more efficient or would this negatively + // effect performance, as direct buffer allocation has a higher upfront cost + // where as a ByteBuffer is heap allocated. + final ByteBuffer byteBuffer = ByteBuffer.allocate(predictor.nextReceiveBufferSize()); + + boolean failure = true; + SocketAddress remoteAddress = null; try { - // Allocating a non-direct buffer with a max udp packge size. - // Would using a direct buffer be more efficient or would this negatively - // effect performance, as direct buffer allocation has a higher upfront cost - // where as a ByteBuffer is heap allocated. - final ByteBuffer byteBuffer = ByteBuffer.allocate(MAX_PACKET_SIZE); - - // Recieve from the channel in a non blocking mode. We have already been notified that + // Receive from the channel in a non blocking mode. We have already been notified that // the channel is ready to receive. - final SocketAddress remoteAddress = datagramChannel - .receive(byteBuffer); + remoteAddress = nioChannel.receive(byteBuffer); + failure = false; + } catch (AsynchronousCloseException e) { + // Can happen, and does not need a user attention. + } catch (Throwable t) { + fireExceptionCaught(channel, t); + } + if (remoteAddress != null) { // Flip the buffer so that we can wrap it. byteBuffer.flip(); - // Create a Netty ChannelByffer by wrapping the ByteBuffer. - final ChannelBuffer channelBuffer = ChannelBuffers - .wrappedBuffer(byteBuffer); - logger.debug("ChannelBuffer : " + channelBuffer + - ", remoteAdress: " + remoteAddress); + int readBytes = byteBuffer.remaining(); + if (readBytes > 0) { + // Update the predictor. + predictor.previousReceiveBufferSize(readBytes); - // Notify the interested parties about the newly arrived message (channelBuffer). - fireMessageReceived(nioDatagramChannel, channelBuffer, - remoteAddress); - } catch (final Throwable t) { - if (!nioDatagramChannel.getDatagramChannel().socket().isClosed()) { - fireExceptionCaught(nioDatagramChannel, t); + // Create a Netty ChannelByffer by wrapping the ByteBuffer. + final ChannelBuffer channelBuffer = ChannelBuffers + .wrappedBuffer(byteBuffer); + + // Notify the interested parties about the newly arrived message (channelBuffer). + fireMessageReceived(channel, channelBuffer, + remoteAddress); } } + + if (failure) { + close(key); + return false; + } + + return true; } private static void close(SelectionKey k) { @@ -393,7 +409,7 @@ class NioUdpWorker implements Runnable { /* * Note that we are not checking if the channel is connected. Connected has a different * meaning in UDP and means that the channels socket is configured to only send and - * recieve from a given remote peer. + * receive from a given remote peer. */ if (!channel.isOpen()) { cleanUpWriteBuffer(channel); @@ -444,7 +460,6 @@ class NioUdpWorker implements Runnable { MessageEvent evt; ChannelBuffer buf; - int bufIdx; int writtenBytes = 0; Queue writeBuffer = channel.writeBufferQueue; @@ -465,33 +480,30 @@ class NioUdpWorker implements Runnable { } buf = (ChannelBuffer) evt.getMessage(); - bufIdx = buf.readerIndex(); } else { buf = (ChannelBuffer) evt.getMessage(); - bufIdx = channel.currentWriteIndex; } try { + int localWrittenBytes = 0; for (int i = writeSpinCount; i > 0; i --) { - ChannelBuffer buffer = (ChannelBuffer) evt.getMessage(); - int localWrittenBytes = channel.getDatagramChannel() - .send(buffer.toByteBuffer(), - evt.getRemoteAddress()); + localWrittenBytes = + channel.getDatagramChannel().send( + buf.toByteBuffer(), + evt.getRemoteAddress()); if (localWrittenBytes != 0) { - bufIdx += localWrittenBytes; writtenBytes += localWrittenBytes; break; } } - if (bufIdx == buf.writerIndex()) { + if (localWrittenBytes > 0) { // Successful write - proceed to the next message. evt.getFuture().setSuccess(); evt = null; } else { - // Not written fully - perhaps the kernel buffer is full. + // Not written at all - perhaps the kernel buffer is full. channel.currentWriteEvent = evt; - channel.currentWriteIndex = bufIdx; addOpWrite = true; break; } @@ -590,7 +602,7 @@ class NioUdpWorker implements Runnable { key.cancel(); } - boolean connected = channel.isOpen(); + boolean connected = channel.isConnected(); boolean bound = channel.isBound(); try { channel.getDatagramChannel().close(); @@ -615,33 +627,46 @@ class NioUdpWorker implements Runnable { } private static void cleanUpWriteBuffer(final NioDatagramChannel channel) { - // Create the exception only once to avoid the excessive overhead - // caused by fillStackTrace. - Exception cause; - if (channel.isOpen()) { - cause = new NotYetConnectedException(); - } else { - cause = new ClosedChannelException(); - } + Exception cause = null; // Clean up the stale messages in the write buffer. synchronized (channel.writeLock) { MessageEvent evt = channel.currentWriteEvent; if (evt != null) { channel.currentWriteEvent = null; - channel.currentWriteIndex = 0; + + // Create the exception only once to avoid the excessive overhead + // caused by fillStackTrace. + if (channel.isOpen()) { + cause = new NotYetConnectedException(); + } else { + cause = new ClosedChannelException(); + } evt.getFuture().setFailure(cause); + fireExceptionCaught(channel, cause); } Queue writeBuffer = channel.writeBufferQueue; - for (;;) { - evt = writeBuffer.poll(); - if (evt == null) { - break; + if (!writeBuffer.isEmpty()) { + // Create the exception only once to avoid the excessive overhead + // caused by fillStackTrace. + if (cause == null) { + if (channel.isOpen()) { + cause = new NotYetConnectedException(); + } else { + cause = new ClosedChannelException(); + } + } + + for (;;) { + evt = writeBuffer.poll(); + if (evt == null) { + break; + } + evt.getFuture().setFailure(cause); + fireExceptionCaught(channel, cause); } - evt.getFuture().setFailure(cause); - fireExceptionCaught(channel, cause); } } } @@ -767,6 +792,7 @@ class NioUdpWorker implements Runnable { throw new ChannelException( "Failed to register a socket to the selector.", e); } + // XXX fireChannelConnected(channel, localAddress); } } diff --git a/src/test/java/org/jboss/netty/channel/socket/nio/NioDatagramChannelTest.java b/src/test/java/org/jboss/netty/channel/socket/nio/NioDatagramChannelTest.java index 2f6ce9d194..e3caa22d8b 100644 --- a/src/test/java/org/jboss/netty/channel/socket/nio/NioDatagramChannelTest.java +++ b/src/test/java/org/jboss/netty/channel/socket/nio/NioDatagramChannelTest.java @@ -29,8 +29,7 @@ import java.net.DatagramPacket; import java.net.InetSocketAddress; import java.util.concurrent.Executors; -import org.jboss.netty.bootstrap.ClientBootstrap; -import org.jboss.netty.bootstrap.ServerBootstrap; +import org.jboss.netty.bootstrap.ConnectionlessBootstrap; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFuture; @@ -39,8 +38,6 @@ import org.junit.BeforeClass; import org.junit.Test; /** - * Unit test for {@link NioDatagramChannel} - * * @author The Netty Project (netty-dev@lists.jboss.org) * @author Daniel Bevenius (dbevenius@jboss.com) * @version $Rev$, $Date$ @@ -54,7 +51,7 @@ public class NioDatagramChannelTest { public static void setupChannel() { final NioDatagramChannelFactory channelFactory = new NioDatagramChannelFactory( Executors.newCachedThreadPool()); - final ServerBootstrap sb = new ServerBootstrap(channelFactory); + final ConnectionlessBootstrap sb = new ConnectionlessBootstrap(channelFactory); inetSocketAddress = new InetSocketAddress("localhost", 9999); sc = sb.bind(inetSocketAddress); final SimpleHandler handler = new SimpleHandler(); @@ -85,7 +82,7 @@ public class NioDatagramChannelTest { public void clientBootstrap() { final NioDatagramChannelFactory channelFactory = new NioDatagramChannelFactory( Executors.newCachedThreadPool()); - final ClientBootstrap bootstrap = new ClientBootstrap(channelFactory); + final ConnectionlessBootstrap bootstrap = new ConnectionlessBootstrap(channelFactory); bootstrap.getPipeline().addLast("test", new SimpleHandler()); bootstrap.setOption("tcpNoDelay", true); bootstrap.setOption("keepAlive", true);