Merge pull request #220 from jestan/master

SCTP Transport Bug Fixes
This commit is contained in:
Norman Maurer 2012-03-05 09:11:42 -08:00
commit 34e328f606
3 changed files with 5 additions and 21 deletions

View File

@ -59,7 +59,7 @@ class SctpNotificationHandler extends AbstractNotificationHandler {
@Override @Override
public HandlerResult handleNotification(ShutdownNotification notification, Object o) { public HandlerResult handleNotification(ShutdownNotification notification, Object o) {
Channels.fireChannelDisconnected(sctpChannel); sctpChannel.worker.close(sctpChannel, Channels.succeededFuture(sctpChannel));
return HandlerResult.RETURN; return HandlerResult.RETURN;
} }

View File

@ -199,8 +199,7 @@ final class SctpSendBufferPool {
final MessageInfo messageInfo = MessageInfo.createOutgoing(ch.association(), null, streamNo); final MessageInfo messageInfo = MessageInfo.createOutgoing(ch.association(), null, streamNo);
messageInfo.payloadProtocolID(protocolId); messageInfo.payloadProtocolID(protocolId);
messageInfo.streamNumber(streamNo); messageInfo.streamNumber(streamNo);
ch.send(buffer, messageInfo); return ch.send(buffer, messageInfo);
return writtenBytes();
} }
@Override @Override
@ -245,8 +244,7 @@ final class SctpSendBufferPool {
final MessageInfo messageInfo = MessageInfo.createOutgoing(ch.association(), null, streamNo); final MessageInfo messageInfo = MessageInfo.createOutgoing(ch.association(), null, streamNo);
messageInfo.payloadProtocolID(protocolId); messageInfo.payloadProtocolID(protocolId);
messageInfo.streamNumber(streamNo); messageInfo.streamNumber(streamNo);
ch.send(buffer, messageInfo); return ch.send(buffer, messageInfo);
return writtenBytes();
} }
@Override @Override

View File

@ -352,26 +352,12 @@ class SctpWorker implements Worker {
final int predictedRecvBufSize = predictor.nextReceiveBufferSize(); final int predictedRecvBufSize = predictor.nextReceiveBufferSize();
boolean messageReceived = false; boolean messageReceived = false;
boolean failure = true;
MessageInfo messageInfo = null; MessageInfo messageInfo = null;
ByteBuffer bb = recvBufferPool.acquire(predictedRecvBufSize); ByteBuffer bb = recvBufferPool.acquire(predictedRecvBufSize);
try { try {
messageInfo = channel.channel.receive(bb, null, notificationHandler); messageInfo = channel.channel.receive(bb, null, notificationHandler);
if (messageInfo != null) { messageReceived = messageInfo != null;
messageReceived = true;
if (!messageInfo.isUnordered()) {
failure = false;
} else {
if (logger.isErrorEnabled()) {
logger.error("Received unordered SCTP Packet");
}
failure = true;
}
} else {
messageReceived = false;
failure = false;
}
} catch (ClosedChannelException e) { } catch (ClosedChannelException e) {
// Can happen, and does not need a user attention. // Can happen, and does not need a user attention.
} catch (Throwable t) { } catch (Throwable t) {
@ -401,7 +387,7 @@ class SctpWorker implements Worker {
recvBufferPool.release(bb); recvBufferPool.release(bb);
} }
if (channel.channel.isBlocking() && !messageReceived || failure) { if (channel.channel.isBlocking() && !messageReceived) {
k.cancel(); // Some JDK implementations run into an infinite loop without this. k.cancel(); // Some JDK implementations run into an infinite loop without this.
close(channel, succeededFuture(channel)); close(channel, succeededFuture(channel));
return false; return false;