SCTP channel classes ported to Netty 4.x

This commit is contained in:
Jestan Nirojan 2012-09-03 17:42:09 +05:30
parent 359d09bd4d
commit 5395944499
8 changed files with 700 additions and 37 deletions

View File

@ -29,11 +29,11 @@ import static io.netty.channel.ChannelOption.*;
/**
* The default {@link SctpChannelConfig} implementation for SCTP.
*/
class DefaultSctpChannelConfig extends DefaultChannelConfig implements SctpChannelConfig {
public class DefaultSctpChannelConfig extends DefaultChannelConfig implements SctpChannelConfig {
private SctpChannel channel;
DefaultSctpChannelConfig(SctpChannel channel) {
public DefaultSctpChannelConfig(SctpChannel channel) {
if (channel == null) {
throw new NullPointerException("channel");
}

View File

@ -0,0 +1,153 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket;
import com.sun.nio.sctp.SctpServerChannel;
import com.sun.nio.sctp.SctpStandardSocketOptions;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelOption;
import io.netty.channel.DefaultChannelConfig;
import io.netty.util.NetworkConstants;
import java.io.IOException;
import java.util.Map;
import static com.sun.nio.sctp.SctpStandardSocketOptions.SCTP_INIT_MAXSTREAMS;
import static com.sun.nio.sctp.SctpStandardSocketOptions.SO_RCVBUF;
import static com.sun.nio.sctp.SctpStandardSocketOptions.SO_SNDBUF;
import static io.netty.channel.ChannelOption.SCTP_NODELAY;
/**
* The default {@link SctpServerChannelConfig} implementation for SCTP.
*/
public class DefaultSctpServerChannelConfig extends DefaultChannelConfig implements SctpServerChannelConfig {
private final SctpServerChannel serverChannel;
private volatile int backlog = NetworkConstants.SOMAXCONN;
/**
* Creates a new instance.
*/
public DefaultSctpServerChannelConfig(com.sun.nio.sctp.SctpServerChannel serverChannel) {
if (serverChannel == null) {
throw new NullPointerException("serverChannel");
}
this.serverChannel = serverChannel;
}
@Override
public Map<ChannelOption<?>, Object> getOptions() {
return getOptions(
super.getOptions(),
ChannelOption.SO_RCVBUF, ChannelOption.SO_SNDBUF, ChannelOption.SCTP_INIT_MAXSTREAMS);
}
@Override
public <T> T getOption(ChannelOption<T> option) {
if (option == ChannelOption.SO_RCVBUF) {
return (T) Integer.valueOf(getReceiveBufferSize());
}
if (option == ChannelOption.SO_SNDBUF) {
return (T) Integer.valueOf(getSendBufferSize());
}
return super.getOption(option);
}
@Override
public <T> boolean setOption(ChannelOption<T> option, T value) {
validate(option, value);
if (option == ChannelOption.SO_RCVBUF) {
setReceiveBufferSize((Integer) value);
} else if (option == ChannelOption.SO_SNDBUF) {
setSendBufferSize((Integer) value);
} else if (option == ChannelOption.SCTP_INIT_MAXSTREAMS) {
setInitMaxStreams((SctpStandardSocketOptions.InitMaxStreams) value);
} else {
return super.setOption(option, value);
}
return true;
}
@Override
public int getSendBufferSize() {
try {
return serverChannel.getOption(SO_SNDBUF);
} catch (IOException e) {
throw new ChannelException(e);
}
}
@Override
public void setSendBufferSize(int sendBufferSize) {
try {
serverChannel.setOption(SO_SNDBUF, sendBufferSize);
} catch (IOException e) {
throw new ChannelException(e);
}
}
@Override
public int getReceiveBufferSize() {
try {
return serverChannel.getOption(SO_RCVBUF);
} catch (IOException e) {
throw new ChannelException(e);
}
}
@Override
public void setReceiveBufferSize(int receiveBufferSize) {
try {
serverChannel.setOption(SO_RCVBUF, receiveBufferSize);
} catch (IOException e) {
throw new ChannelException(e);
}
}
@Override
public SctpStandardSocketOptions.InitMaxStreams getInitMaxStreams() {
try {
return serverChannel.getOption(SCTP_INIT_MAXSTREAMS);
} catch (IOException e) {
throw new ChannelException(e);
}
}
@Override
public void setInitMaxStreams(SctpStandardSocketOptions.InitMaxStreams initMaxStreams) {
try {
serverChannel.setOption(SCTP_INIT_MAXSTREAMS, initMaxStreams);
} catch (IOException e) {
throw new ChannelException(e);
}
}
@Override
public int getBacklog() {
return backlog;
}
@Override
public void setBacklog(int backlog) {
if (backlog < 0) {
throw new IllegalArgumentException("backlog: " + backlog);
}
this.backlog = backlog;
}
}

View File

@ -19,28 +19,16 @@ import com.sun.nio.sctp.Association;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Set;
/**
* A SCTP/IP {@link io.netty.channel.Channel}
*/
public interface SctpChannel extends Channel {
/**
* Bind a address to the already bound channel to enable multi-homing.
* The Channel bust be bound and yet to be connected.
*/
ChannelFuture bindAddress(InetAddress localAddress);
/**
* Unbind the address from channel's multi-homing address list.
* The address should be added already in multi-homing address list.
*/
ChannelFuture unbindAddress(InetAddress localAddress);
/**
* Returns the underlying SCTP association.
*/
@ -57,13 +45,13 @@ public interface SctpChannel extends Channel {
* with SctpStandardSocketOption.SCTP_PRIMARY_ADDR option).
*/
@Override
InetSocketAddress localAddress();
SocketAddress localAddress();
/**
* Return all local addresses of the SCTP channel.
* Please note that, it will return more than one address if this channel is using multi-homing
*/
Set<InetSocketAddress> allLocalAddresses();
Set<SocketAddress> allLocalAddresses();
/**
* Returns the {@link SctpChannelConfig} configuration of the channel.
@ -82,12 +70,12 @@ public interface SctpChannel extends Channel {
* calling the local SCTP stack with SctpStandardSocketOption.SCTP_SET_PEER_PRIMARY_ADDR option)
*/
@Override
InetSocketAddress remoteAddress();
SocketAddress remoteAddress();
/**
* Return all remote addresses of the SCTP server channel.
* Please note that, it will return more than one address if the remote is using multi-homing.
*/
Set<InetSocketAddress> allRemoteAddresses();
Set<SocketAddress> allRemoteAddresses();
}

View File

@ -0,0 +1,37 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket;
import com.sun.nio.sctp.Notification;
public class SctpNotification {
private Notification notification;
private Object attachment;
public SctpNotification(Notification notification, Object attachment) {
this.notification = notification;
this.attachment = attachment;
}
public Notification getNotification() {
return notification;
}
public Object getAttachment() {
return attachment;
}
}

View File

@ -0,0 +1,65 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket;
import com.sun.nio.sctp.AbstractNotificationHandler;
import com.sun.nio.sctp.AssociationChangeNotification;
import com.sun.nio.sctp.HandlerResult;
import com.sun.nio.sctp.Notification;
import com.sun.nio.sctp.PeerAddressChangeNotification;
import com.sun.nio.sctp.SendFailedNotification;
import com.sun.nio.sctp.ShutdownNotification;
import io.netty.channel.ChannelPipeline;
public class SctpNotificationHandler extends AbstractNotificationHandler<Object> {
private final SctpChannel sctpChannel;
private final ChannelPipeline pipeline;
public SctpNotificationHandler(SctpChannel sctpChannel) {
this.sctpChannel = sctpChannel;
pipeline = sctpChannel.pipeline();
}
@Override
public HandlerResult handleNotification(AssociationChangeNotification notification, Object o) {
updateInboundBuffer(notification, o);
return HandlerResult.CONTINUE;
}
@Override
public HandlerResult handleNotification(PeerAddressChangeNotification notification, Object o) {
updateInboundBuffer(notification, o);
return HandlerResult.CONTINUE;
}
@Override
public HandlerResult handleNotification(SendFailedNotification notification, Object o) {
updateInboundBuffer(notification, o);
return HandlerResult.CONTINUE;
}
@Override
public HandlerResult handleNotification(ShutdownNotification notification, Object o) {
sctpChannel.close();
return HandlerResult.RETURN;
}
private void updateInboundBuffer(Notification notification, Object o) {
pipeline.inboundMessageBuffer().add(new SctpNotification(notification, o));
}
}

View File

@ -15,11 +15,9 @@
*/
package io.netty.channel.socket;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ServerChannel;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Set;
/**
@ -29,18 +27,6 @@ import java.util.Set;
* underlying JDK SCTP Server Channel like multi-homing etc.
*/
public interface SctpServerChannel extends ServerChannel {
/**
* Bind a address to the already bound channel to enable multi-homing.
* The Channel bust be bound and yet to be connected.
*/
ChannelFuture bindAddress(InetAddress localAddress);
/**
* Unbind the address from channel's multi-homing address list.
* The address should be added already in multi-homing address list.
*/
ChannelFuture unbindAddress(InetAddress localAddress);
/**
* Returns the {@link SctpServerChannelConfig} configuration of the channel.
@ -59,11 +45,11 @@ public interface SctpServerChannel extends ServerChannel {
* with SctpStandardSocketOption.SCTP_PRIMARY_ADDR option).
*/
@Override
InetSocketAddress localAddress();
SocketAddress localAddress();
/**
* Return all local addresses of the SCTP server channel.
* Please note that, it will return more than one address if this channel is using multi-homing
*/
Set<InetSocketAddress> allLocalAddresses();
Set<SocketAddress> allLocalAddresses();
}

View File

@ -0,0 +1,280 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.nio;
import com.sun.nio.sctp.Association;
import com.sun.nio.sctp.MessageInfo;
import com.sun.nio.sctp.NotificationHandler;
import com.sun.nio.sctp.SctpChannel;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ChannelBufType;
import io.netty.buffer.MessageBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.socket.DefaultSctpChannelConfig;
import io.netty.channel.socket.SctpChannelConfig;
import io.netty.channel.socket.SctpFrame;
import io.netty.channel.socket.SctpNotificationHandler;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
public class NioSctpChannel extends AbstractNioMessageChannel implements io.netty.channel.socket.SctpChannel {
private static final ChannelMetadata METADATA = new ChannelMetadata(ChannelBufType.MESSAGE, false);
private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioSctpChannel.class);
private final SctpChannelConfig config;
private final NotificationHandler notificationHandler;
private static SctpChannel newSctpChannel() {
try {
return SctpChannel.open();
} catch (IOException e) {
throw new ChannelException("Failed to open a sctp channel.", e);
}
}
public NioSctpChannel() {
this(newSctpChannel());
}
public NioSctpChannel(SctpChannel sctpChannel) {
this(null, null, sctpChannel);
}
public NioSctpChannel(Channel parent, Integer id, SctpChannel sctpChannel) {
super(parent, id, sctpChannel, SelectionKey.OP_READ);
try {
sctpChannel.configureBlocking(false);
} catch (IOException e) {
try {
sctpChannel.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized sctp channel.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
config = new DefaultSctpChannelConfig(sctpChannel);
notificationHandler = new SctpNotificationHandler(this);
}
@Override
public ChannelMetadata metadata() {
return METADATA;
}
@Override
public Association association() {
try {
return javaChannel().association();
} catch (IOException e) {
return null;
}
}
@Override
public Set<SocketAddress> allLocalAddresses() {
try {
final Set<SocketAddress> allLocalAddresses = javaChannel().getAllLocalAddresses();
final Set<SocketAddress> addresses = new HashSet<SocketAddress>(allLocalAddresses.size());
for (SocketAddress socketAddress : allLocalAddresses) {
addresses.add(socketAddress);
}
return addresses;
} catch (Throwable t) {
return Collections.emptySet();
}
}
@Override
public SctpChannelConfig config() {
return config;
}
@Override
public Set<SocketAddress> allRemoteAddresses() {
try {
final Set<SocketAddress> allLocalAddresses = javaChannel().getRemoteAddresses();
final Set<SocketAddress> addresses = new HashSet<SocketAddress>(allLocalAddresses.size());
for (SocketAddress socketAddress : allLocalAddresses) {
addresses.add(socketAddress);
}
return addresses;
} catch (Throwable t) {
return Collections.emptySet();
}
}
@Override
protected SctpChannel javaChannel() {
return (SctpChannel) super.javaChannel();
}
@Override
public boolean isActive() {
SctpChannel ch = javaChannel();
return ch.isOpen() && association() != null;
}
@Override
protected SocketAddress localAddress0() {
try {
for (SocketAddress address : javaChannel().getAllLocalAddresses()) {
return address;
}
} catch (IOException e) {
// ignore
}
return null;
}
@Override
protected SocketAddress remoteAddress0() {
try {
for (SocketAddress address : javaChannel().getRemoteAddresses()) {
return address;
}
} catch (IOException e) {
// ignore
}
return null;
}
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
javaChannel().bind(localAddress);
}
@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
if (localAddress != null) {
javaChannel().bind(localAddress);
}
boolean success = false;
try {
boolean connected = javaChannel().connect(remoteAddress);
if (connected) {
selectionKey().interestOps(SelectionKey.OP_READ);
} else {
selectionKey().interestOps(SelectionKey.OP_CONNECT);
}
success = true;
return connected;
} finally {
if (!success) {
doClose();
}
}
}
@Override
protected void doFinishConnect() throws Exception {
if (!javaChannel().finishConnect()) {
throw new Error();
}
selectionKey().interestOps(SelectionKey.OP_READ);
}
@Override
protected void doDisconnect() throws Exception {
doClose();
}
@Override
protected void doClose() throws Exception {
javaChannel().close();
}
@Override
protected int doReadMessages(MessageBuf<Object> buf) throws Exception {
SctpChannel ch = javaChannel();
ByteBuffer data = ByteBuffer.allocate(config().getReceiveBufferSize());
MessageInfo messageInfo = ch.receive(data, null, notificationHandler);
if (messageInfo == null) {
return 0;
}
data.flip();
buf.add(new SctpFrame(messageInfo, Unpooled.wrappedBuffer(data)));
return 1;
}
@Override
protected int doWriteMessages(MessageBuf<Object> buf, boolean lastSpin) throws Exception {
SctpFrame packet = (SctpFrame) buf.peek();
ByteBuf data = packet.getPayloadBuffer();
int dataLen = data.readableBytes();
ByteBuffer nioData;
if (data.hasNioBuffer()) {
nioData = data.nioBuffer();
} else {
nioData = ByteBuffer.allocate(dataLen);
data.getBytes(data.readerIndex(), nioData);
nioData.flip();
}
final MessageInfo messageInfo = MessageInfo.createOutgoing(association(), null, packet.getStreamIdentifier());
messageInfo.payloadProtocolID(packet.getProtocolIdentifier());
messageInfo.streamNumber(packet.getStreamIdentifier());
final int writtenBytes = javaChannel().send(nioData, messageInfo);
final SelectionKey key = selectionKey();
final int interestOps = key.interestOps();
if (writtenBytes <= 0 && dataLen > 0) {
// Did not write a packet.
// 1) If 'lastSpin' is false, the caller will call this method again real soon.
// - Do not update OP_WRITE.
// 2) If 'lastSpin' is true, the caller will not retry.
// - Set OP_WRITE so that the event loop calls flushForcibly() later.
if (lastSpin) {
if ((interestOps & SelectionKey.OP_WRITE) == 0) {
key.interestOps(interestOps | SelectionKey.OP_WRITE);
}
}
return 0;
}
// Wrote a packet.
buf.remove();
if (buf.isEmpty()) {
// Wrote the outbound buffer completely - clear OP_WRITE.
if ((interestOps & SelectionKey.OP_WRITE) != 0) {
key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
}
}
return 1;
}
}

View File

@ -0,0 +1,154 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.nio;
import com.sun.nio.sctp.SctpChannel;
import com.sun.nio.sctp.SctpServerChannel;
import io.netty.buffer.ChannelBufType;
import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.socket.DefaultSctpServerChannelConfig;
import io.netty.channel.socket.SctpServerChannelConfig;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.SelectionKey;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
public class NioSctpServerChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.SctpServerChannel {
private static final ChannelMetadata METADATA = new ChannelMetadata(ChannelBufType.MESSAGE, false);
private static SctpServerChannel newSocket() {
try {
return SctpServerChannel.open();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
private final SctpServerChannelConfig config;
public NioSctpServerChannel() {
super(null, null, newSocket(), SelectionKey.OP_ACCEPT);
config = new DefaultSctpServerChannelConfig(this.javaChannel());
}
@Override
public ChannelMetadata metadata() {
return METADATA;
}
@Override
public Set<SocketAddress> allLocalAddresses() {
try {
final Set<SocketAddress> allLocalAddresses = javaChannel().getAllLocalAddresses();
final Set<SocketAddress> addresses = new HashSet<SocketAddress>(allLocalAddresses.size());
for (SocketAddress socketAddress : allLocalAddresses) {
addresses.add(socketAddress);
}
return addresses;
} catch (Throwable t) {
return Collections.emptySet();
}
}
@Override
public SctpServerChannelConfig config() {
return config;
}
@Override
public boolean isActive() {
return isOpen() && !allLocalAddresses().isEmpty();
}
@Override
public InetSocketAddress remoteAddress() {
return null;
}
@Override
protected SctpServerChannel javaChannel() {
return (SctpServerChannel) super.javaChannel();
}
@Override
protected SocketAddress localAddress0() {
try {
for (SocketAddress address : javaChannel().getAllLocalAddresses()) {
return address;
}
} catch (IOException e) {
// ignore
}
return null;
}
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
javaChannel().bind(localAddress, config.getBacklog());
SelectionKey selectionKey = selectionKey();
selectionKey.interestOps(selectionKey.interestOps() | SelectionKey.OP_ACCEPT);
}
@Override
protected void doClose() throws Exception {
javaChannel().close();
}
@Override
protected int doReadMessages(MessageBuf<Object> buf) throws Exception {
SctpChannel ch = javaChannel().accept();
if (ch == null) {
return 0;
}
buf.add(new NioSctpChannel(this, null, ch));
return 1;
}
// Unnecessary stuff
@Override
protected boolean doConnect(
SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected void doFinishConnect() throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected SocketAddress remoteAddress0() {
return null;
}
@Override
protected void doDisconnect() throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected int doWriteMessages(MessageBuf<Object> buf, boolean lastSpin) throws Exception {
throw new UnsupportedOperationException();
}
}