From 9c1f3c6fe89d289b61124de199497ff8c0cab20a Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Mon, 2 Apr 2012 21:02:41 +0200 Subject: [PATCH] Fix UDP nio impl and add simple tests --- .../socket/AbstractDatagramTest.java | 88 ++++++++++++++ .../socket/nio/nio/NioNioDatagramTest.java | 36 ++++++ .../socket/nio/oio/NioOioDatagramTest.java | 37 ++++++ .../socket/oio/nio/OioNioDatagramTest.java | 36 ++++++ .../socket/oio/oio/OioOioDatagramTest.java | 36 ++++++ .../channel/socket/nio/AbstractNioWorker.java | 2 +- .../channel/socket/nio/NioDatagramWorker.java | 110 ++++++++++++++++++ 7 files changed, 344 insertions(+), 1 deletion(-) create mode 100644 testsuite/src/test/java/io/netty/testsuite/transport/socket/AbstractDatagramTest.java create mode 100644 testsuite/src/test/java/io/netty/testsuite/transport/socket/nio/nio/NioNioDatagramTest.java create mode 100644 testsuite/src/test/java/io/netty/testsuite/transport/socket/nio/oio/NioOioDatagramTest.java create mode 100644 testsuite/src/test/java/io/netty/testsuite/transport/socket/oio/nio/OioNioDatagramTest.java create mode 100644 testsuite/src/test/java/io/netty/testsuite/transport/socket/oio/oio/OioOioDatagramTest.java diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/AbstractDatagramTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/AbstractDatagramTest.java new file mode 100644 index 0000000000..e54bf00acf --- /dev/null +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/AbstractDatagramTest.java @@ -0,0 +1,88 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.testsuite.transport.socket; + +import static org.junit.Assert.assertTrue; +import io.netty.bootstrap.ConnectionlessBootstrap; +import io.netty.buffer.ChannelBuffer; +import io.netty.buffer.ChannelBuffers; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.MessageEvent; +import io.netty.channel.SimpleChannelUpstreamHandler; +import io.netty.channel.socket.DatagramChannelFactory; +import io.netty.util.internal.ExecutorUtil; + +import java.net.InetSocketAddress; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public abstract class AbstractDatagramTest { + + private static ExecutorService executor; + + + @BeforeClass + public static void init() { + executor = Executors.newCachedThreadPool(); + } + + @AfterClass + public static void destroy() { + ExecutorUtil.terminate(executor); + } + + protected abstract DatagramChannelFactory newServerSocketChannelFactory(Executor executor); + protected abstract DatagramChannelFactory newClientSocketChannelFactory(Executor executor); + + @Test + public void testSimpleSend() throws Throwable { + ConnectionlessBootstrap sb = new ConnectionlessBootstrap(newServerSocketChannelFactory(executor)); + ConnectionlessBootstrap cb = new ConnectionlessBootstrap(newClientSocketChannelFactory(executor)); + + final CountDownLatch latch = new CountDownLatch(1); + sb.getPipeline().addFirst("handler", new SimpleChannelUpstreamHandler() { + + @Override + public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { + super.messageReceived(ctx, e); + Assert.assertEquals(1,((ChannelBuffer)e.getMessage()).readInt()); + + latch.countDown(); + } + + }); + cb.getPipeline().addFirst("handler", new SimpleChannelUpstreamHandler()); + + Channel sc = sb.bind(new InetSocketAddress(0)); + + Channel cc = cb.bind(new InetSocketAddress(0)); + cc.write(ChannelBuffers.wrapInt(1), sc.getLocalAddress()); + + assertTrue(latch.await(10, TimeUnit.SECONDS)); + sc.close().awaitUninterruptibly(); + cc.close().awaitUninterruptibly(); + + } +} diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/nio/nio/NioNioDatagramTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/nio/nio/NioNioDatagramTest.java new file mode 100644 index 0000000000..a9953878a5 --- /dev/null +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/nio/nio/NioNioDatagramTest.java @@ -0,0 +1,36 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.testsuite.transport.socket.nio.nio; + +import java.util.concurrent.Executor; + +import io.netty.channel.socket.DatagramChannelFactory; +import io.netty.channel.socket.nio.NioDatagramChannelFactory; +import io.netty.testsuite.transport.socket.AbstractDatagramTest; + +public class NioNioDatagramTest extends AbstractDatagramTest{ + + @Override + protected DatagramChannelFactory newServerSocketChannelFactory(Executor executor) { + return new NioDatagramChannelFactory(executor); + } + + @Override + protected DatagramChannelFactory newClientSocketChannelFactory(Executor executor) { + return new NioDatagramChannelFactory(executor); + } + +} diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/nio/oio/NioOioDatagramTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/nio/oio/NioOioDatagramTest.java new file mode 100644 index 0000000000..f843c06733 --- /dev/null +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/nio/oio/NioOioDatagramTest.java @@ -0,0 +1,37 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.testsuite.transport.socket.nio.oio; + +import java.util.concurrent.Executor; + +import io.netty.channel.socket.DatagramChannelFactory; +import io.netty.channel.socket.nio.NioDatagramChannelFactory; +import io.netty.channel.socket.oio.OioDatagramChannelFactory; +import io.netty.testsuite.transport.socket.AbstractDatagramTest; + +public class NioOioDatagramTest extends AbstractDatagramTest{ + + @Override + protected DatagramChannelFactory newServerSocketChannelFactory(Executor executor) { + return new OioDatagramChannelFactory(executor); + } + + @Override + protected DatagramChannelFactory newClientSocketChannelFactory(Executor executor) { + return new NioDatagramChannelFactory(executor); + } + +} diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/oio/nio/OioNioDatagramTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/oio/nio/OioNioDatagramTest.java new file mode 100644 index 0000000000..0d231d9313 --- /dev/null +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/oio/nio/OioNioDatagramTest.java @@ -0,0 +1,36 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.testsuite.transport.socket.oio.nio; + +import java.util.concurrent.Executor; + +import io.netty.channel.socket.DatagramChannelFactory; +import io.netty.channel.socket.oio.OioDatagramChannelFactory; +import io.netty.testsuite.transport.socket.AbstractDatagramTest; + +public class OioNioDatagramTest extends AbstractDatagramTest{ + + @Override + protected DatagramChannelFactory newServerSocketChannelFactory(Executor executor) { + return new OioDatagramChannelFactory(executor); + } + + @Override + protected DatagramChannelFactory newClientSocketChannelFactory(Executor executor) { + return new OioDatagramChannelFactory(executor); + } + +} diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/oio/oio/OioOioDatagramTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/oio/oio/OioOioDatagramTest.java new file mode 100644 index 0000000000..edf4f9c94d --- /dev/null +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/oio/oio/OioOioDatagramTest.java @@ -0,0 +1,36 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.testsuite.transport.socket.oio.oio; + +import java.util.concurrent.Executor; + +import io.netty.channel.socket.DatagramChannelFactory; +import io.netty.channel.socket.oio.OioDatagramChannelFactory; +import io.netty.testsuite.transport.socket.AbstractDatagramTest; + +public class OioOioDatagramTest extends AbstractDatagramTest{ + + @Override + protected DatagramChannelFactory newServerSocketChannelFactory(Executor executor) { + return new OioDatagramChannelFactory(executor); + } + + @Override + protected DatagramChannelFactory newClientSocketChannelFactory(Executor executor) { + return new OioDatagramChannelFactory(executor); + } + +} diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java index 1bc1f1e051..c65b6ff451 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java @@ -116,7 +116,7 @@ abstract class AbstractNioWorker implements Worker { private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation - private final SendBufferPool sendBufferPool = new SendBufferPool(); + protected final SendBufferPool sendBufferPool = new SendBufferPool(); private final boolean allowShutdownOnIdle; diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorker.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorker.java index 1806cc749e..712b21fa99 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorker.java @@ -24,14 +24,19 @@ import static io.netty.channel.Channels.succeededFuture; import io.netty.buffer.ChannelBufferFactory; import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; +import io.netty.channel.Channels; +import io.netty.channel.MessageEvent; import io.netty.channel.ReceiveBufferSizePredictor; +import io.netty.channel.socket.nio.SendBufferPool.SendBuffer; import java.net.SocketAddress; import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousCloseException; import java.nio.channels.ClosedChannelException; import java.nio.channels.DatagramChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; +import java.util.Queue; import java.util.concurrent.Executor; /** @@ -187,6 +192,111 @@ public class NioDatagramWorker extends AbstractNioWorker { write0(channel); } + + @Override + protected void write0(final AbstractNioChannel channel) { + + boolean addOpWrite = false; + boolean removeOpWrite = false; + + long writtenBytes = 0; + + final SendBufferPool sendBufferPool = this.sendBufferPool; + final DatagramChannel ch = ((NioDatagramChannel) channel).getJdkChannel().getChannel(); + final Queue writeBuffer = channel.writeBufferQueue; + final int writeSpinCount = channel.getConfig().getWriteSpinCount(); + synchronized (channel.writeLock) { + // inform the channel that write is in-progress + channel.inWriteNowLoop = true; + + // loop forever... + for (;;) { + MessageEvent evt = channel.currentWriteEvent; + SendBuffer buf; + if (evt == null) { + if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) { + removeOpWrite = true; + channel.writeSuspended = false; + break; + } + + channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage()); + } else { + buf = channel.currentWriteBuffer; + } + + try { + long localWrittenBytes = 0; + SocketAddress raddr = evt.getRemoteAddress(); + if (raddr == null) { + for (int i = writeSpinCount; i > 0; i --) { + localWrittenBytes = buf.transferTo(ch); + if (localWrittenBytes != 0) { + writtenBytes += localWrittenBytes; + break; + } + if (buf.finished()) { + break; + } + } + } else { + for (int i = writeSpinCount; i > 0; i --) { + localWrittenBytes = buf.transferTo(ch, raddr); + if (localWrittenBytes != 0) { + writtenBytes += localWrittenBytes; + break; + } + if (buf.finished()) { + break; + } + } + } + + if (localWrittenBytes > 0 || buf.finished()) { + // Successful write - proceed to the next message. + buf.release(); + ChannelFuture future = evt.getFuture(); + channel.currentWriteEvent = null; + channel.currentWriteBuffer = null; + evt = null; + buf = null; + future.setSuccess(); + } else { + // Not written at all - perhaps the kernel buffer is full. + addOpWrite = true; + channel.writeSuspended = true; + break; + } + } catch (final AsynchronousCloseException e) { + // Doesn't need a user attention - ignore. + } catch (final Throwable t) { + buf.release(); + ChannelFuture future = evt.getFuture(); + channel.currentWriteEvent = null; + channel.currentWriteBuffer = null; + buf = null; + evt = null; + future.setFailure(t); + fireExceptionCaught(channel, t); + } + } + channel.inWriteNowLoop = false; + + // Initially, the following block was executed after releasing + // the writeLock, but there was a race condition, and it has to be + // executed before releasing the writeLock: + // + // https://issues.jboss.org/browse/NETTY-410 + // + if (addOpWrite) { + setOpWrite(channel); + } else if (removeOpWrite) { + clearOpWrite(channel); + } + } + + Channels.fireWriteComplete(channel, writtenBytes); + }