Fixed OioSctpChannel event loop issue #632 by using 3 different selectors

This commit is contained in:
Jestan Nirojan 2012-11-23 01:17:13 +05:30 committed by Norman Maurer
parent 4229b23a51
commit 44142efe55
2 changed files with 124 additions and 33 deletions

View File

@ -38,6 +38,8 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
@ -60,6 +62,11 @@ public class OioSctpChannel extends AbstractOioMessageChannel
private final SctpChannel ch;
private final SctpChannelConfig config;
private final Selector readSelector;
private final Selector writeSelector;
private final Selector connectSelector;
private final NotificationHandler<?> notificationHandler;
private static SctpChannel openChannel() {
@ -99,7 +106,15 @@ public class OioSctpChannel extends AbstractOioMessageChannel
this.ch = ch;
boolean success = false;
try {
ch.configureBlocking(true);
ch.configureBlocking(false);
readSelector = Selector.open();
writeSelector = Selector.open();
connectSelector = Selector.open();
ch.register(readSelector, SelectionKey.OP_READ);
ch.register(writeSelector, SelectionKey.OP_WRITE);
ch.register(connectSelector, SelectionKey.OP_CONNECT);
config = new DefaultSctpChannelConfig(ch);
notificationHandler = new SctpNotificationHandler(this);
success = true;
@ -133,45 +148,77 @@ public class OioSctpChannel extends AbstractOioMessageChannel
@Override
protected int doReadMessages(MessageBuf<Object> buf) throws Exception {
if (readSuspended) {
if (readSuspended || !readSelector.isOpen()) {
return 0;
}
ByteBuffer data = ByteBuffer.allocate(config().getReceiveBufferSize());
MessageInfo messageInfo = ch.receive(data, null, notificationHandler);
if (messageInfo == null) {
return 0;
int readMessages = 0;
final int selectedKeys = readSelector.select(SO_TIMEOUT);
final boolean keysSelected = selectedKeys > 0;
if (!keysSelected) {
return readMessages;
}
data.flip();
buf.add(new SctpMessage(messageInfo, Unpooled.wrappedBuffer(data)));
Set<SelectionKey> reableKeys = readSelector.selectedKeys();
try {
for (SelectionKey _ : reableKeys) {
ByteBuffer data = ByteBuffer.allocate(config().getReceiveBufferSize());
MessageInfo messageInfo = ch.receive(data, null, notificationHandler);
if (messageInfo == null) {
return readMessages;
}
if (readSuspended) {
return 0;
} else {
return 1;
data.flip();
buf.add(new SctpMessage(messageInfo, Unpooled.wrappedBuffer(data)));
readMessages ++;
if (readSuspended) {
return readMessages;
}
}
} finally {
reableKeys.clear();
}
return readMessages;
}
@Override
protected void doWriteMessages(MessageBuf<Object> buf) throws Exception {
SctpMessage packet = (SctpMessage) buf.poll();
ByteBuf data = packet.getPayloadBuffer();
int dataLen = data.readableBytes();
ByteBuffer nioData;
if (data.nioBufferCount() == 1) {
nioData = data.nioBuffer();
} else {
nioData = ByteBuffer.allocate(dataLen);
data.getBytes(data.readerIndex(), nioData);
nioData.flip();
if (!writeSelector.isOpen()) {
return;
}
final int selectedKeys = writeSelector.select(SO_TIMEOUT);
if (selectedKeys > 0) {
final Set<SelectionKey> writableKeys = writeSelector.selectedKeys();
for (SelectionKey _ : writableKeys) {
SctpMessage packet = (SctpMessage) buf.poll();
if (packet == null) {
return;
}
ByteBuf data = packet.getPayloadBuffer();
int dataLen = data.readableBytes();
ByteBuffer nioData;
final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.getStreamIdentifier());
mi.payloadProtocolID(packet.getProtocolIdentifier());
mi.streamNumber(packet.getStreamIdentifier());
if (data.nioBufferCount() != -1) {
nioData = data.nioBuffer();
} else {
nioData = ByteBuffer.allocate(dataLen);
data.getBytes(data.readerIndex(), nioData);
nioData.flip();
}
ch.send(nioData, mi);
final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.getStreamIdentifier());
mi.payloadProtocolID(packet.getProtocolIdentifier());
mi.streamNumber(packet.getStreamIdentifier());
ch.send(nioData, mi);
}
writableKeys.clear();
}
}
@Override
@ -257,7 +304,21 @@ public class OioSctpChannel extends AbstractOioMessageChannel
boolean success = false;
try {
ch.connect(remoteAddress);
success = true;
boolean finishConnect = false;
while (!finishConnect) {
if (connectSelector.select(SO_TIMEOUT) >= 0) {
final Set<SelectionKey> selectionKeys = connectSelector.selectedKeys();
for (SelectionKey key : selectionKeys) {
if (key.isConnectable()) {
selectionKeys.clear();
finishConnect = true;
break;
}
}
selectionKeys.clear();
}
}
success = ch.finishConnect();
} finally {
if (!success) {
doClose();
@ -272,9 +333,20 @@ public class OioSctpChannel extends AbstractOioMessageChannel
@Override
protected void doClose() throws Exception {
closeSelector("read", readSelector);
closeSelector("write", writeSelector);
closeSelector("connect", connectSelector);
ch.close();
}
private void closeSelector(String selectorName, Selector selector) {
try {
selector.close();
} catch (IOException e) {
logger.warn("Failed to close a " + selectorName + " selector.", e);
}
}
@Override
public ChannelFuture bindAddress(InetAddress localAddress) {
return bindAddress(localAddress, newFuture());

View File

@ -29,6 +29,8 @@ import io.netty.logging.InternalLoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
@ -57,8 +59,9 @@ public class OioSctpServerChannel extends AbstractOioMessageChannel
}
}
final SctpServerChannel sch;
private final SctpServerChannel sch;
private final SctpServerChannelConfig config;
private final Selector selector;
/**
* Create a new instance with an new {@link SctpServerChannel}
@ -91,7 +94,9 @@ public class OioSctpServerChannel extends AbstractOioMessageChannel
this.sch = sch;
boolean success = false;
try {
sch.configureBlocking(true);
sch.configureBlocking(false);
selector = Selector.open();
sch.register(selector, SelectionKey.OP_ACCEPT);
config = new DefaultSctpServerChannelConfig(sch);
success = true;
} catch (Exception e) {
@ -166,6 +171,11 @@ public class OioSctpServerChannel extends AbstractOioMessageChannel
@Override
protected void doClose() throws Exception {
try {
selector.close();
} catch (IOException e) {
logger.warn("Failed to close a selector.", e);
}
sch.close();
}
@ -181,11 +191,20 @@ public class OioSctpServerChannel extends AbstractOioMessageChannel
SctpChannel s = null;
try {
s = sch.accept();
if (s != null) {
buf.add(new OioSctpChannel(this, null, s));
return 1;
final int selectedKeys = selector.select(SO_TIMEOUT);
if (selectedKeys > 0) {
final Set<SelectionKey> selectionKeys = selector.selectedKeys();
for (SelectionKey key : selectionKeys) {
if (key.isAcceptable()) {
s = sch.accept();
if (s != null) {
buf.add(new OioSctpChannel(this, null, s));
}
}
}
return selectedKeys;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted sctp channel.", t);
if (s != null) {