Fix UDP nio impl and add some tests for it

This commit is contained in:
norman 2012-04-03 07:44:28 +02:00
parent e73d9a60e7
commit 6995701f20
7 changed files with 344 additions and 3 deletions

View File

@ -112,7 +112,7 @@ abstract class AbstractNioWorker implements Worker {
private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation
private final SocketSendBufferPool sendBufferPool = new SocketSendBufferPool();
protected final SocketSendBufferPool sendBufferPool = new SocketSendBufferPool();
private final boolean allowShutdownOnIdle;
@ -545,7 +545,7 @@ abstract class AbstractNioWorker implements Worker {
return Thread.currentThread() == channel.worker.thread;
}
private void setOpWrite(AbstractNioChannel<?> channel) {
protected void setOpWrite(AbstractNioChannel<?> channel) {
Selector selector = this.selector;
SelectionKey key = channel.channel.keyFor(selector);
if (key == null) {
@ -568,7 +568,7 @@ abstract class AbstractNioWorker implements Worker {
}
}
private void clearOpWrite(AbstractNioChannel<?> channel) {
protected void clearOpWrite(AbstractNioChannel<?> channel) {
Selector selector = this.selector;
SelectionKey key = channel.channel.keyFor(selector);
if (key == null) {

View File

@ -24,14 +24,18 @@ import static org.jboss.netty.channel.Channels.succeededFuture;
import org.jboss.netty.buffer.ChannelBufferFactory;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.ReceiveBufferSizePredictor;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Queue;
import java.util.concurrent.Executor;
/**
@ -235,5 +239,110 @@ public class NioDatagramWorker extends AbstractNioWorker {
write0(channel);
}
@Override
protected void write0(final AbstractNioChannel<?> channel) {
boolean addOpWrite = false;
boolean removeOpWrite = false;
long writtenBytes = 0;
final SocketSendBufferPool sendBufferPool = this.sendBufferPool;
final DatagramChannel ch = ((NioDatagramChannel) channel).getDatagramChannel();
final Queue<MessageEvent> writeBuffer = channel.writeBufferQueue;
final int writeSpinCount = channel.getConfig().getWriteSpinCount();
synchronized (channel.writeLock) {
// inform the channel that write is in-progress
channel.inWriteNowLoop = true;
// loop forever...
for (;;) {
MessageEvent evt = channel.currentWriteEvent;
SocketSendBufferPool.SendBuffer buf;
if (evt == null) {
if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
removeOpWrite = true;
channel.writeSuspended = false;
break;
}
channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage());
} else {
buf = channel.currentWriteBuffer;
}
try {
long localWrittenBytes = 0;
SocketAddress raddr = evt.getRemoteAddress();
if (raddr == null) {
for (int i = writeSpinCount; i > 0; i --) {
localWrittenBytes = buf.transferTo(ch);
if (localWrittenBytes != 0) {
writtenBytes += localWrittenBytes;
break;
}
if (buf.finished()) {
break;
}
}
} else {
for (int i = writeSpinCount; i > 0; i --) {
localWrittenBytes = buf.transferTo(ch, raddr);
if (localWrittenBytes != 0) {
writtenBytes += localWrittenBytes;
break;
}
if (buf.finished()) {
break;
}
}
}
if (localWrittenBytes > 0 || buf.finished()) {
// Successful write - proceed to the next message.
buf.release();
ChannelFuture future = evt.getFuture();
channel.currentWriteEvent = null;
channel.currentWriteBuffer = null;
evt = null;
buf = null;
future.setSuccess();
} else {
// Not written at all - perhaps the kernel buffer is full.
addOpWrite = true;
channel.writeSuspended = true;
break;
}
} catch (final AsynchronousCloseException e) {
// Doesn't need a user attention - ignore.
} catch (final Throwable t) {
buf.release();
ChannelFuture future = evt.getFuture();
channel.currentWriteEvent = null;
channel.currentWriteBuffer = null;
buf = null;
evt = null;
future.setFailure(t);
fireExceptionCaught(channel, t);
}
}
channel.inWriteNowLoop = false;
// Initially, the following block was executed after releasing
// the writeLock, but there was a race condition, and it has to be
// executed before releasing the writeLock:
//
// https://issues.jboss.org/browse/NETTY-410
//
if (addOpWrite) {
setOpWrite(channel);
} else if (removeOpWrite) {
clearOpWrite(channel);
}
}
Channels.fireWriteComplete(channel, writtenBytes);
}
}

View File

@ -0,0 +1,90 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket;
import static org.junit.Assert.assertTrue;
import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.socket.DatagramChannelFactory;
import org.jboss.netty.util.internal.ExecutorUtil;
import java.net.InetSocketAddress;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
public abstract class AbstractDatagramTest {
private static ExecutorService executor;
@BeforeClass
public static void init() {
executor = Executors.newCachedThreadPool();
}
@AfterClass
public static void destroy() {
ExecutorUtil.terminate(executor);
}
protected abstract DatagramChannelFactory newServerSocketChannelFactory(Executor executor);
protected abstract DatagramChannelFactory newClientSocketChannelFactory(Executor executor);
@Test
public void testSimpleSend() throws Throwable {
ConnectionlessBootstrap sb = new ConnectionlessBootstrap(newServerSocketChannelFactory(executor));
ConnectionlessBootstrap cb = new ConnectionlessBootstrap(newClientSocketChannelFactory(executor));
final CountDownLatch latch = new CountDownLatch(1);
sb.getPipeline().addFirst("handler", new SimpleChannelUpstreamHandler() {
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
super.messageReceived(ctx, e);
Assert.assertEquals(1,((ChannelBuffer)e.getMessage()).readInt());
latch.countDown();
}
});
cb.getPipeline().addFirst("handler", new SimpleChannelUpstreamHandler());
Channel sc = sb.bind(new InetSocketAddress(0));
Channel cc = cb.bind(new InetSocketAddress(0));
ChannelBuffer buf = ChannelBuffers.dynamicBuffer();
buf.writeInt(1);
cc.write(buf, sc.getLocalAddress());
assertTrue(latch.await(10, TimeUnit.SECONDS));
sc.close().awaitUninterruptibly();
cc.close().awaitUninterruptibly();
}
}

View File

@ -0,0 +1,35 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket;
import java.util.concurrent.Executor;
import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
public class NioNioDatagramTest extends AbstractDatagramTest{
@Override
protected DatagramChannelFactory newServerSocketChannelFactory(Executor executor) {
return new NioDatagramChannelFactory(executor);
}
@Override
protected DatagramChannelFactory newClientSocketChannelFactory(Executor executor) {
return new NioDatagramChannelFactory(executor);
}
}

View File

@ -0,0 +1,37 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket;
import java.util.concurrent.Executor;
import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
import org.jboss.netty.channel.socket.oio.OioDatagramChannelFactory;
public class NioOioDatagramTest extends AbstractDatagramTest{
@Override
protected DatagramChannelFactory newClientSocketChannelFactory(Executor executor) {
return new NioDatagramChannelFactory(executor);
}
@Override
protected DatagramChannelFactory newServerSocketChannelFactory(Executor executor) {
return new OioDatagramChannelFactory(executor);
}
}

View File

@ -0,0 +1,35 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket;
import java.util.concurrent.Executor;
import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
import org.jboss.netty.channel.socket.oio.OioDatagramChannelFactory;
public class OioNioDatagramTest extends AbstractDatagramTest{
@Override
protected DatagramChannelFactory newServerSocketChannelFactory(Executor executor) {
return new NioDatagramChannelFactory(executor);
}
@Override
protected DatagramChannelFactory newClientSocketChannelFactory(Executor executor) {
return new OioDatagramChannelFactory(executor);
}
}

View File

@ -0,0 +1,35 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket;
import java.util.concurrent.Executor;
import org.jboss.netty.channel.socket.oio.OioDatagramChannelFactory;
public class OioOioDatagramTest extends AbstractDatagramTest{
@Override
protected DatagramChannelFactory newServerSocketChannelFactory(Executor executor) {
return new OioDatagramChannelFactory(executor);
}
@Override
protected DatagramChannelFactory newClientSocketChannelFactory(Executor executor) {
return new OioDatagramChannelFactory(executor);
}
}