diff --git a/example/src/main/java/io/netty/example/sctp/SctpClient.java b/example/src/main/java/io/netty/example/sctp/SctpClient.java
index 3378f65f3a..c093e15906 100644
--- a/example/src/main/java/io/netty/example/sctp/SctpClient.java
+++ b/example/src/main/java/io/netty/example/sctp/SctpClient.java
@@ -44,7 +44,6 @@ public class SctpClient {
// Configure the client.
ClientBootstrap bootstrap = new ClientBootstrap(
new SctpClientSocketChannelFactory(
- Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
final ExecutionHandler executionHandler = new ExecutionHandler(
diff --git a/example/src/main/java/io/netty/example/sctp/SctpServer.java b/example/src/main/java/io/netty/example/sctp/SctpServer.java
index a45a66e9ff..186e6733e3 100644
--- a/example/src/main/java/io/netty/example/sctp/SctpServer.java
+++ b/example/src/main/java/io/netty/example/sctp/SctpServer.java
@@ -41,7 +41,6 @@ public class SctpServer {
// Configure the server.
ServerBootstrap bootstrap = new ServerBootstrap(
new SctpServerSocketChannelFactory(
- Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
final ExecutionHandler executionHandler = new ExecutionHandler(
diff --git a/transport-sctp/src/test/java/io/netty/testsuite/util/SctpSocketAddresses.java b/transport-sctp/src/main/java/com/sun/nio/sctp/package-info.java
similarity index 66%
rename from transport-sctp/src/test/java/io/netty/testsuite/util/SctpSocketAddresses.java
rename to transport-sctp/src/main/java/com/sun/nio/sctp/package-info.java
index 1435c414a2..beefdb38ca 100644
--- a/transport-sctp/src/test/java/io/netty/testsuite/util/SctpSocketAddresses.java
+++ b/transport-sctp/src/main/java/com/sun/nio/sctp/package-info.java
@@ -14,10 +14,9 @@
* under the License.
*/
-package io.netty.testsuite.util;
-
-public class SctpSocketAddresses {
- //io.netty.util.SocketAddresses.LOCALHOST interface has MTU SIZE issues with SCTP, we have to use local loop back interface for testing
- public final static String LOOP_BACK = "127.0.0.1";
- public final static String LOOP_BACK2 = "127.0.0.2";
-}
+/**
+ * This package is only included to let SCTP also compile on non-unix operation systems.
+ *
+ * This will not get included in the generated jar!
+ */
+package com.sun.nio.sctp;
diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractSctpChannelSink.java b/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractSctpChannelSink.java
deleted file mode 100644
index 906fc35c99..0000000000
--- a/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractSctpChannelSink.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Copyright 2011 The Netty Project
- *
- * The Netty Project licenses this file to you under the Apache License,
- * version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
- * under the License.
- */
-
-package io.netty.channel.sctp;
-
-import io.netty.channel.AbstractChannelSink;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.socket.ChannelRunnableWrapper;
-
-public abstract class AbstractSctpChannelSink extends AbstractChannelSink {
-
- @Override
- public ChannelFuture execute(ChannelPipeline pipeline, final Runnable task) {
- Channel ch = pipeline.getChannel();
- if (ch instanceof SctpChannelImpl) {
- SctpChannelImpl channel = (SctpChannelImpl) ch;
- ChannelRunnableWrapper wrapper = new ChannelRunnableWrapper(channel, task);
- channel.worker.executeInIoThread(wrapper);
- return wrapper;
-
- } else {
- return super.execute(pipeline, task);
- }
-
- }
-}
diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractWriteRequestQueue.java b/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractWriteRequestQueue.java
deleted file mode 100644
index b2b6ce436d..0000000000
--- a/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractWriteRequestQueue.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Copyright 2011 The Netty Project
- *
- * The Netty Project licenses this file to you under the Apache License,
- * version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
- * under the License.
- */
-package io.netty.channel.sctp;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import io.netty.channel.MessageEvent;
-import io.netty.util.internal.QueueFactory;
-
-abstract class AbstractWriteRequestQueue implements BlockingQueue {
-
- protected final BlockingQueue queue;
-
- public AbstractWriteRequestQueue() {
- this.queue = QueueFactory.createQueue(MessageEvent.class);
- }
-
- @Override
- public MessageEvent remove() {
- return queue.remove();
- }
-
- @Override
- public MessageEvent element() {
- return queue.element();
- }
-
- @Override
- public MessageEvent peek() {
- return queue.peek();
- }
-
- @Override
- public int size() {
- return queue.size();
- }
-
- @Override
- public boolean isEmpty() {
- return queue.isEmpty();
- }
-
- @Override
- public Iterator iterator() {
- return queue.iterator();
- }
-
- @Override
- public Object[] toArray() {
- return queue.toArray();
- }
-
- @Override
- public T[] toArray(T[] a) {
- return queue.toArray(a);
- }
-
- @Override
- public boolean containsAll(Collection> c) {
- return queue.containsAll(c);
- }
-
- @Override
- public boolean addAll(Collection extends MessageEvent> c) {
- return queue.addAll(c);
- }
-
- @Override
- public boolean removeAll(Collection> c) {
- return queue.removeAll(c);
- }
-
- @Override
- public boolean retainAll(Collection> c) {
- return queue.retainAll(c);
- }
-
- @Override
- public void clear() {
- queue.clear();
- }
-
- @Override
- public boolean add(MessageEvent e) {
- return queue.add(e);
- }
-
- @Override
- public void put(MessageEvent e) throws InterruptedException {
- queue.put(e);
- }
-
- @Override
- public boolean offer(MessageEvent e, long timeout, TimeUnit unit) throws InterruptedException {
- return queue.offer(e, timeout, unit);
- }
-
- @Override
- public MessageEvent take() throws InterruptedException {
- return queue.take();
- }
-
- @Override
- public MessageEvent poll(long timeout, TimeUnit unit) throws InterruptedException {
- return queue.poll(timeout, unit);
- }
-
- @Override
- public int remainingCapacity() {
- return queue.remainingCapacity();
- }
-
- @Override
- public boolean remove(Object o) {
- return queue.remove(o);
- }
-
- @Override
- public boolean contains(Object o) {
- return queue.contains(o);
- }
-
- @Override
- public int drainTo(Collection super MessageEvent> c) {
- return queue.drainTo(c);
- }
-
- @Override
- public int drainTo(Collection super MessageEvent> c, int maxElements) {
- return queue.drainTo(c, maxElements);
- }
-
-}
diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/NioSctpChannelConfig.java b/transport-sctp/src/main/java/io/netty/channel/sctp/NioSctpChannelConfig.java
index c2344793b1..dfa1be49c7 100644
--- a/transport-sctp/src/main/java/io/netty/channel/sctp/NioSctpChannelConfig.java
+++ b/transport-sctp/src/main/java/io/netty/channel/sctp/NioSctpChannelConfig.java
@@ -17,6 +17,7 @@ package io.netty.channel.sctp;
import io.netty.channel.ReceiveBufferSizePredictor;
import io.netty.channel.ReceiveBufferSizePredictorFactory;
+import io.netty.channel.socket.nio.NioChannelConfig;
/**
* A {@link io.netty.channel.sctp.SctpChannelConfig} for a NIO SCTP/IP {@link io.netty.channel.sctp.SctpChannel}.
@@ -43,48 +44,7 @@ import io.netty.channel.ReceiveBufferSizePredictorFactory;
*
*
*/
-public interface NioSctpChannelConfig extends SctpChannelConfig {
-
- /**
- * Returns the high water mark of the write buffer. If the number of bytes
- * queued in the write buffer exceeds this value, {@link io.netty.channel.Channel#isWritable()}
- * will start to return {@code false}.
- */
- int getWriteBufferHighWaterMark();
-
- /**
- * Sets the high water mark of the write buffer. If the number of bytes
- * queued in the write buffer exceeds this value, {@link io.netty.channel.Channel#isWritable()}
- * will start to return {@code false}.
- */
- void setWriteBufferHighWaterMark(int writeBufferHighWaterMark);
-
- /**
- * Returns the low water mark of the write buffer. Once the number of bytes
- * queued in the write buffer exceeded the
- * {@linkplain #setWriteBufferHighWaterMark(int) high water mark} and then
- * dropped down below this value, {@link io.netty.channel.Channel#isWritable()} will return
- * {@code true} again.
- */
- int getWriteBufferLowWaterMark();
-
- /**
- * Sets the low water mark of the write buffer. Once the number of bytes
- * queued in the write buffer exceeded the
- * {@linkplain #setWriteBufferHighWaterMark(int) high water mark} and then
- * dropped down below this value, {@link io.netty.channel.Channel#isWritable()} will return
- * {@code true} again.
- */
- void setWriteBufferLowWaterMark(int writeBufferLowWaterMark);
-
- /**
- * Returns the maximum loop count for a write operation until
- * {@link java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)} returns a non-zero value.
- * It is similar to what a spin lock is used for in concurrency programming.
- * It improves memory utilization and write throughput depending on
- * the platform that JVM runs on. The default value is {@code 16}.
- */
- int getWriteSpinCount();
+public interface NioSctpChannelConfig extends SctpChannelConfig, NioChannelConfig {
/**
* Sets the maximum loop count for a write operation until
diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpAcceptedChannel.java b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpAcceptedChannel.java
index 7996280614..8e9bca344f 100644
--- a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpAcceptedChannel.java
+++ b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpAcceptedChannel.java
@@ -27,17 +27,13 @@ import static io.netty.channel.Channels.*;
*/
final class SctpAcceptedChannel extends SctpChannelImpl {
- final Thread bossThread;
-
SctpAcceptedChannel(
ChannelFactory factory, ChannelPipeline pipeline,
Channel parent, ChannelSink sink,
- SctpChannel socket, SctpWorker worker, Thread bossThread) {
+ SctpChannel socket, SctpWorker worker) {
super(parent, factory, pipeline, sink, socket, worker);
- this.bossThread = bossThread;
-
setConnected();
fireChannelOpen(this);
fireChannelBound(this, getLocalAddress());
diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpChannelImpl.java b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpChannelImpl.java
index 2ace041ebe..6c55a1bd83 100644
--- a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpChannelImpl.java
+++ b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpChannelImpl.java
@@ -15,22 +15,7 @@
*/
package io.netty.channel.sctp;
-import static io.netty.channel.Channels.*;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import com.sun.nio.sctp.Association;
-
-import io.netty.channel.AbstractChannel;
+import static io.netty.channel.Channels.future;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelFuture;
@@ -38,12 +23,22 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelSink;
import io.netty.channel.MessageEvent;
-import io.netty.channel.sctp.SctpSendBufferPool.SendBuffer;
-import io.netty.util.internal.ThreadLocalBoolean;
+import io.netty.channel.sctp.SctpSendBufferPool.SctpSendBuffer;
+import io.netty.channel.socket.nio.AbstractNioChannel;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Queue;
+import java.util.Set;
+
+import com.sun.nio.sctp.Association;
/**
*/
-class SctpChannelImpl extends AbstractChannel implements SctpChannel {
+class SctpChannelImpl extends AbstractNioChannel implements SctpChannel {
private static final int ST_OPEN = 0;
private static final int ST_BOUND = 1;
@@ -51,35 +46,14 @@ class SctpChannelImpl extends AbstractChannel implements SctpChannel {
private static final int ST_CLOSED = -1;
volatile int state = ST_OPEN;
- final com.sun.nio.sctp.SctpChannel channel;
- final SctpWorker worker;
private final NioSctpChannelConfig config;
- private volatile InetSocketAddress localAddress;
- private volatile InetSocketAddress remoteAddress;
-
- final Object interestOpsLock = new Object();
- final Object writeLock = new Object();
-
- final Runnable writeTask = new WriteTask();
- final AtomicBoolean writeTaskInTaskQueue = new AtomicBoolean();
-
- final Queue writeBuffer = new WriteRequestQueue();
- final AtomicInteger writeBufferSize = new AtomicInteger();
- final AtomicInteger highWaterMarkCounter = new AtomicInteger();
- boolean inWriteNowLoop;
- boolean writeSuspended;
-
- MessageEvent currentWriteEvent;
- SendBuffer currentWriteBuffer;
final SctpNotificationHandler notificationHandler = new SctpNotificationHandler(this);
public SctpChannelImpl(Channel parent, ChannelFactory factory, ChannelPipeline pipeline, ChannelSink sink,
com.sun.nio.sctp.SctpChannel channel, SctpWorker worker) {
- super(parent, factory, pipeline, sink);
+ super(parent, factory, pipeline, sink, worker, new SctpJdkChannel(channel));
- this.channel = channel;
- this.worker = worker;
config = new DefaultNioSctpChannelConfig(channel);
getCloseFuture().addListener(new ChannelFutureListener() {
@@ -90,31 +64,76 @@ class SctpChannelImpl extends AbstractChannel implements SctpChannel {
});
}
+ Queue getWriteBufferQueue() {
+ return writeBufferQueue;
+ }
+
+ Object getWriteLock() {
+ return writeLock;
+ }
+
+ Object getInterestedOpsLock() {
+ return interestOpsLock;
+ }
+
+
+ void setWriteSuspended(boolean writeSuspended) {
+ this.writeSuspended = writeSuspended;
+ }
+
+ boolean getWriteSuspended() {
+ return writeSuspended;
+ }
+
+ void setInWriteNowLoop(boolean inWriteNowLoop) {
+ this.inWriteNowLoop = inWriteNowLoop;
+ }
+
+ MessageEvent getCurrentWriteEvent() {
+ return currentWriteEvent;
+ }
+
+ void setCurrentWriteEvent(MessageEvent currentWriteEvent) {
+ this.currentWriteEvent = currentWriteEvent;
+ }
+
+ int getRawInterestOps() {
+ return super.getInterestOps();
+ }
+
+ void setRawInterestOpsNow(int interestOps) {
+ super.setInterestOpsNow(interestOps);
+ }
+
+ SctpSendBuffer getCurrentWriteBuffer() {
+ return (SctpSendBuffer) currentWriteBuffer;
+ }
+
+ void setCurrentWriteBuffer(SctpSendBuffer currentWriteBuffer) {
+ this.currentWriteBuffer = currentWriteBuffer;
+ }
+
+ @Override
+ public SctpWorker getWorker() {
+ return (SctpWorker) super.getWorker();
+ }
+
+
@Override
public NioSctpChannelConfig getConfig() {
return config;
}
@Override
- public InetSocketAddress getLocalAddress() {
- InetSocketAddress localAddress = this.localAddress;
- if (localAddress == null) {
- try {
- final Iterator iterator = channel.getAllLocalAddresses().iterator();
- if (iterator.hasNext()) {
- this.localAddress = localAddress = (InetSocketAddress) iterator.next();
- }
- } catch (Throwable t) {
- return null;
- }
- }
- return localAddress;
+ public SctpJdkChannel getJdkChannel() {
+ return (SctpJdkChannel) super.getJdkChannel();
}
+
@Override
public Set getAllLocalAddresses() {
try {
- final Set allLocalAddresses = channel.getAllLocalAddresses();
+ final Set allLocalAddresses = getJdkChannel().getChannel().getAllLocalAddresses();
final Set addresses = new HashSet(allLocalAddresses.size());
for (SocketAddress socketAddress: allLocalAddresses) {
addresses.add((InetSocketAddress) socketAddress);
@@ -125,26 +144,10 @@ class SctpChannelImpl extends AbstractChannel implements SctpChannel {
}
}
- @Override
- public InetSocketAddress getRemoteAddress() {
- InetSocketAddress remoteAddress = this.remoteAddress;
- if (remoteAddress == null) {
- try {
- final Iterator iterator = channel.getRemoteAddresses().iterator();
- if (iterator.hasNext()) {
- this.remoteAddress = remoteAddress = (InetSocketAddress) iterator.next();
- }
- } catch (Throwable t) {
- return null;
- }
- }
- return remoteAddress;
- }
-
@Override
public Set getAllRemoteAddresses() {
try {
- final Set allLocalAddresses = channel.getRemoteAddresses();
+ final Set allLocalAddresses = getJdkChannel().getChannel().getRemoteAddresses();
final Set addresses = new HashSet(allLocalAddresses.size());
for (SocketAddress socketAddress: allLocalAddresses) {
addresses.add((InetSocketAddress) socketAddress);
@@ -172,7 +175,7 @@ class SctpChannelImpl extends AbstractChannel implements SctpChannel {
@Override
public Association association() {
try {
- return channel.association();
+ return getJdkChannel().getChannel().association();
} catch (Throwable e) {
return null;
}
@@ -198,7 +201,7 @@ class SctpChannelImpl extends AbstractChannel implements SctpChannel {
state = ST_BOUND;
}
- final void setConnected() {
+ protected final void setConnected() {
if (state != ST_CLOSED) {
state = ST_CONNECTED;
}
@@ -208,126 +211,20 @@ class SctpChannelImpl extends AbstractChannel implements SctpChannel {
protected boolean setClosed() {
return super.setClosed();
}
-
+
@Override
- public int getInterestOps() {
- if (!isOpen()) {
- return Channel.OP_WRITE;
- }
-
- int interestOps = getRawInterestOps();
- int writeBufferSize = this.writeBufferSize.get();
- if (writeBufferSize != 0) {
- if (highWaterMarkCounter.get() > 0) {
- int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
- if (writeBufferSize >= lowWaterMark) {
- interestOps |= Channel.OP_WRITE;
- } else {
- interestOps &= ~Channel.OP_WRITE;
- }
- } else {
- int highWaterMark = getConfig().getWriteBufferHighWaterMark();
- if (writeBufferSize >= highWaterMark) {
- interestOps |= Channel.OP_WRITE;
- } else {
- interestOps &= ~Channel.OP_WRITE;
+ protected WriteRequestQueue createRequestQueue() {
+ return new WriteRequestQueue() {
+
+ @Override
+ protected int getMessageSize(MessageEvent e) {
+ Object m = e.getMessage();
+ if (m instanceof SctpFrame) {
+ return ((SctpFrame) m).getPayloadBuffer().readableBytes();
}
+ return 0;
}
- } else {
- interestOps &= ~Channel.OP_WRITE;
- }
-
- return interestOps;
+ };
}
- int getRawInterestOps() {
- return super.getInterestOps();
- }
-
- void setRawInterestOpsNow(int interestOps) {
- super.setInterestOpsNow(interestOps);
- }
-
- @Override
- public ChannelFuture write(Object message, SocketAddress remoteAddress) {
- if (remoteAddress == null || remoteAddress.equals(getRemoteAddress())) {
- return super.write(message, null);
- } else {
- return getUnsupportedOperationFuture();
- }
- }
-
- private final class WriteRequestQueue extends AbstractWriteRequestQueue {
-
- private final ThreadLocalBoolean notifying = new ThreadLocalBoolean();
-
- WriteRequestQueue() {
- super();
- }
-
- @Override
- public boolean offer(MessageEvent e) {
- boolean success = queue.offer(e);
- assert success;
-
- int messageSize = getMessageSize(e);
- int newWriteBufferSize = writeBufferSize.addAndGet(messageSize);
- int highWaterMark = getConfig().getWriteBufferHighWaterMark();
-
- if (newWriteBufferSize >= highWaterMark) {
- if (newWriteBufferSize - messageSize < highWaterMark) {
- highWaterMarkCounter.incrementAndGet();
- if (!notifying.get()) {
- notifying.set(Boolean.TRUE);
- fireChannelInterestChanged(SctpChannelImpl.this);
- notifying.set(Boolean.FALSE);
- }
- }
- }
- return true;
- }
-
- @Override
- public MessageEvent poll() {
- MessageEvent e = queue.poll();
- if (e != null) {
- int messageSize = getMessageSize(e);
- int newWriteBufferSize = writeBufferSize.addAndGet(-messageSize);
- int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
-
- if (newWriteBufferSize == 0 || newWriteBufferSize < lowWaterMark) {
- if (newWriteBufferSize + messageSize >= lowWaterMark) {
- highWaterMarkCounter.decrementAndGet();
- if (isConnected() && !notifying.get()) {
- notifying.set(Boolean.TRUE);
- fireChannelInterestChanged(SctpChannelImpl.this);
- notifying.set(Boolean.FALSE);
- }
- }
- }
- }
- return e;
- }
-
- private int getMessageSize(MessageEvent e) {
- Object m = e.getMessage();
- if (m instanceof SctpFrame) {
- return ((SctpFrame) m).getPayloadBuffer().readableBytes();
- }
- return 0;
- }
- }
-
- private final class WriteTask implements Runnable {
-
- WriteTask() {
- super();
- }
-
- @Override
- public void run() {
- writeTaskInTaskQueue.set(false);
- worker.writeFromTaskLoop(SctpChannelImpl.this);
- }
- }
}
diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpClientPipelineSink.java b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpClientPipelineSink.java
index 1e664a0120..9bcc85f908 100644
--- a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpClientPipelineSink.java
+++ b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpClientPipelineSink.java
@@ -15,56 +15,25 @@
*/
package io.netty.channel.sctp;
-import static io.netty.channel.Channels.*;
-
-import java.io.IOException;
-import java.net.ConnectException;
-import java.net.InetAddress;
-import java.net.SocketAddress;
-import java.nio.channels.ClosedChannelException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.util.Iterator;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
+import static io.netty.channel.Channels.fireChannelBound;
+import static io.netty.channel.Channels.fireExceptionCaught;
+import static io.netty.channel.Channels.succeededFuture;
import io.netty.channel.ChannelEvent;
-import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelState;
import io.netty.channel.ChannelStateEvent;
import io.netty.channel.MessageEvent;
-import io.netty.logging.InternalLogger;
-import io.netty.logging.InternalLoggerFactory;
-import io.netty.util.internal.DeadLockProofWorker;
-import io.netty.util.internal.QueueFactory;
+import io.netty.channel.socket.nio.AbstractNioChannelSink;
+
+import java.net.InetAddress;
+import java.net.SocketAddress;
+import java.nio.channels.ClosedChannelException;
/**
*/
-class SctpClientPipelineSink extends AbstractSctpChannelSink {
-
- static final InternalLogger logger =
- InternalLoggerFactory.getInstance(SctpClientPipelineSink.class);
-
- final Executor bossExecutor;
- private final Boss boss = new Boss();
- private final SctpWorker[] workers;
- private final AtomicInteger workerIndex = new AtomicInteger();
-
- SctpClientPipelineSink(
- Executor bossExecutor, Executor workerExecutor, int workerCount) {
- this.bossExecutor = bossExecutor;
- workers = new SctpWorker[workerCount];
- for (int i = 0; i < workers.length; i ++) {
- workers[i] = new SctpWorker(workerExecutor);
- }
- }
+class SctpClientPipelineSink extends AbstractNioChannelSink {
@Override
public void eventSunk(
@@ -80,21 +49,21 @@ class SctpClientPipelineSink extends AbstractSctpChannelSink {
switch (state) {
case OPEN:
if (Boolean.FALSE.equals(value)) {
- channel.worker.close(channel, future);
+ channel.getWorker().close(channel, future);
}
break;
case BOUND:
if (value != null) {
bind(channel, future, (SocketAddress) value);
} else {
- channel.worker.close(channel, future);
+ channel.getWorker().close(channel, future);
}
break;
case CONNECTED:
if (value != null) {
connect(channel, future, (SocketAddress) value);
} else {
- channel.worker.close(channel, future);
+ channel.getWorker().close(channel, future);
}
break;
case INTEREST_OPS:
@@ -105,16 +74,16 @@ class SctpClientPipelineSink extends AbstractSctpChannelSink {
SctpUnbindAddressEvent unbindAddressEvent = (SctpUnbindAddressEvent) event;
unbindAddress(channel, unbindAddressEvent.getFuture(), unbindAddressEvent.getValue());
} else {
- channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());
+ channel.getWorker().setInterestOps(channel, future, ((Integer) value).intValue());
}
break;
}
} else if (e instanceof MessageEvent) {
MessageEvent event = (MessageEvent) e;
SctpChannelImpl channel = (SctpChannelImpl) event.getChannel();
- boolean offered = channel.writeBuffer.offer(event);
+ boolean offered = channel.getWriteBufferQueue().offer(event);
assert offered;
- channel.worker.writeFromUserCode(channel);
+ channel.getWorker().writeFromUserCode(channel);
}
}
@@ -122,7 +91,7 @@ class SctpClientPipelineSink extends AbstractSctpChannelSink {
SctpClientChannel channel, ChannelFuture future,
SocketAddress localAddress) {
try {
- channel.channel.bind(localAddress);
+ channel.getJdkChannel().bind(localAddress);
channel.boundManually = true;
channel.setBound();
future.setSuccess();
@@ -137,7 +106,7 @@ class SctpClientPipelineSink extends AbstractSctpChannelSink {
SctpClientChannel channel, ChannelFuture future,
InetAddress localAddress) {
try {
- channel.channel.bindAddress(localAddress);
+ channel.getJdkChannel().getChannel().bindAddress(localAddress);
future.setSuccess();
} catch (Throwable t) {
future.setFailure(t);
@@ -149,7 +118,7 @@ class SctpClientPipelineSink extends AbstractSctpChannelSink {
SctpClientChannel channel, ChannelFuture future,
InetAddress localAddress) {
try {
- channel.channel.unbindAddress(localAddress);
+ channel.getJdkChannel().getChannel().unbindAddress(localAddress);
future.setSuccess();
} catch (Throwable t) {
future.setFailure(t);
@@ -163,293 +132,28 @@ class SctpClientPipelineSink extends AbstractSctpChannelSink {
final SctpClientChannel channel, final ChannelFuture cf,
SocketAddress remoteAddress) {
try {
- if (channel.channel.connect(remoteAddress)) {
- channel.worker.register(channel, cf);
- } else {
- channel.getCloseFuture().addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture f)
- throws Exception {
- if (!cf.isDone()) {
- cf.setFailure(new ClosedChannelException());
- }
+ channel.getJdkChannel().connect(remoteAddress);
+
+ channel.getCloseFuture().addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture f)
+ throws Exception {
+ if (!cf.isDone()) {
+ cf.setFailure(new ClosedChannelException());
}
- });
- cf.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
- channel.connectFuture = cf;
- boss.register(channel);
- }
+ }
+ });
+ cf.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
+ channel.connectFuture = cf;
+ channel.getWorker().registerWithWorker(channel, cf);
+
} catch (Throwable t) {
cf.setFailure(t);
fireExceptionCaught(channel, t);
- channel.worker.close(channel, succeededFuture(channel));
+ channel.getWorker().close(channel, succeededFuture(channel));
}
}
- SctpWorker nextWorker() {
- return workers[Math.abs(
- workerIndex.getAndIncrement() % workers.length)];
- }
-
- private final class Boss implements Runnable {
-
- volatile Selector selector;
- private boolean started;
- private final AtomicBoolean wakenUp = new AtomicBoolean();
- private final Object startStopLock = new Object();
- private final Queue registerTaskQueue = QueueFactory.createQueue(Runnable.class);
-
- Boss() {
- super();
- }
-
- void register(SctpClientChannel channel) {
- Runnable registerTask = new RegisterTask(this, channel);
- Selector selector;
-
- synchronized (startStopLock) {
- if (!started) {
- // Open a selector if this worker didn't start yet.
- try {
- this.selector = selector = Selector.open();
- } catch (Throwable t) {
- throw new ChannelException(
- "Failed to create a selector.", t);
- }
-
- // Start the worker thread with the new Selector.
- boolean success = false;
- try {
- DeadLockProofWorker.start(bossExecutor, this);
- success = true;
- } finally {
- if (!success) {
- // Release the Selector if the execution fails.
- try {
- selector.close();
- } catch (Throwable t) {
- logger.warn("Failed to close a selector.", t);
- }
- this.selector = selector = null;
- // The method will return to the caller at this point.
- }
- }
- } else {
- // Use the existing selector if this worker has been started.
- selector = this.selector;
- }
-
- assert selector != null && selector.isOpen();
-
- started = true;
- boolean offered = registerTaskQueue.offer(registerTask);
- assert offered;
- }
-
- if (wakenUp.compareAndSet(false, true)) {
- selector.wakeup();
- }
- }
-
- @Override
- public void run() {
- boolean shutdown = false;
- Selector selector = this.selector;
- long lastConnectTimeoutCheckTimeNanos = System.nanoTime();
- for (;;) {
- wakenUp.set(false);
-
- try {
- int selectedKeyCount = selector.select(10);
-
- // 'wakenUp.compareAndSet(false, true)' is always evaluated
- // before calling 'selector.wakeup()' to reduce the wake-up
- // overhead. (Selector.wakeup() is an expensive operation.)
- //
- // However, there is a race condition in this approach.
- // The race condition is triggered when 'wakenUp' is set to
- // true too early.
- //
- // 'wakenUp' is set to true too early if:
- // 1) Selector is waken up between 'wakenUp.set(false)' and
- // 'selector.select(...)'. (BAD)
- // 2) Selector is waken up between 'selector.select(...)' and
- // 'if (wakenUp.get()) { ... }'. (OK)
- //
- // In the first case, 'wakenUp' is set to true and the
- // following 'selector.select(...)' will wake up immediately.
- // Until 'wakenUp' is set to false again in the next round,
- // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
- // any attempt to wake up the Selector will fail, too, causing
- // the following 'selector.select(...)' call to block
- // unnecessarily.
- //
- // To fix this problem, we wake up the selector again if wakenUp
- // is true immediately after selector.select(...).
- // It is inefficient in that it wakes up the selector for both
- // the first case (BAD - wake-up required) and the second case
- // (OK - no wake-up required).
-
- if (wakenUp.get()) {
- selector.wakeup();
- }
-
- processRegisterTaskQueue();
-
- if (selectedKeyCount > 0) {
- processSelectedKeys(selector.selectedKeys());
- }
-
- // Handle connection timeout every 10 milliseconds approximately.
- long currentTimeNanos = System.nanoTime();
- if (currentTimeNanos - lastConnectTimeoutCheckTimeNanos >= 10 * 1000000L) {
- lastConnectTimeoutCheckTimeNanos = currentTimeNanos;
- processConnectTimeout(selector.keys(), currentTimeNanos);
- }
-
- // Exit the loop when there's nothing to handle.
- // The shutdown flag is used to delay the shutdown of this
- // loop to avoid excessive Selector creation when
- // connection attempts are made in a one-by-one manner
- // instead of concurrent manner.
- if (selector.keys().isEmpty()) {
- if (shutdown ||
- bossExecutor instanceof ExecutorService && ((ExecutorService) bossExecutor).isShutdown()) {
-
- synchronized (startStopLock) {
- if (registerTaskQueue.isEmpty() && selector.keys().isEmpty()) {
- started = false;
- try {
- selector.close();
- } catch (IOException e) {
- if (logger.isWarnEnabled()) {
- logger.warn(
- "Failed to close a selector.", e);
- }
- } finally {
- this.selector = null;
- }
- break;
- } else {
- shutdown = false;
- }
- }
- } else {
- // Give one more second.
- shutdown = true;
- }
- } else {
- shutdown = false;
- }
- } catch (Throwable t) {
- if (logger.isWarnEnabled()) {
- logger.warn(
- "Unexpected exception in the selector loop.", t);
- }
-
- // Prevent possible consecutive immediate failures.
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- // Ignore.
- }
- }
- }
- }
-
- private void processRegisterTaskQueue() {
- for (;;) {
- final Runnable task = registerTaskQueue.poll();
- if (task == null) {
- break;
- }
-
- task.run();
- }
- }
-
- private void processSelectedKeys(Set selectedKeys) {
- for (Iterator i = selectedKeys.iterator(); i.hasNext();) {
- SelectionKey k = i.next();
- i.remove();
-
- if (!k.isValid()) {
- close(k);
- continue;
- }
-
- if (k.isConnectable()) {
- connect(k);
- }
- }
- }
-
- private void processConnectTimeout(Set keys, long currentTimeNanos) {
- ConnectException cause = null;
- for (SelectionKey k: keys) {
- if (!k.isValid()) {
- continue;
- }
-
- SctpClientChannel ch = (SctpClientChannel) k.attachment();
- if (ch.connectDeadlineNanos > 0 &&
- currentTimeNanos >= ch.connectDeadlineNanos) {
-
- if (cause == null) {
- cause = new ConnectException("connection timed out");
- }
-
- ch.connectFuture.setFailure(cause);
- fireExceptionCaught(ch, cause);
- ch.worker.close(ch, succeededFuture(ch));
- }
- }
- }
-
- private void connect(SelectionKey k) {
- SctpClientChannel ch = (SctpClientChannel) k.attachment();
- try {
- if (ch.channel.finishConnect()) {
- k.cancel();
- ch.worker.register(ch, ch.connectFuture);
- }
- } catch (Throwable t) {
- ch.connectFuture.setFailure(t);
- fireExceptionCaught(ch, t);
- k.cancel(); // Some JDK implementations run into an infinite loop without this.
- ch.worker.close(ch, succeededFuture(ch));
- }
- }
-
- private void close(SelectionKey k) {
- SctpClientChannel ch = (SctpClientChannel) k.attachment();
- ch.worker.close(ch, succeededFuture(ch));
- }
- }
-
- private static final class RegisterTask implements Runnable {
- private final Boss boss;
- private final SctpClientChannel channel;
-
- RegisterTask(Boss boss, SctpClientChannel channel) {
- this.boss = boss;
- this.channel = channel;
- }
-
- @Override
- public void run() {
- try {
- channel.channel.register(
- boss.selector, SelectionKey.OP_CONNECT, channel);
- } catch (ClosedChannelException e) {
- channel.worker.close(channel, succeededFuture(channel));
- }
-
- int connectTimeout = channel.getConfig().getConnectTimeoutMillis();
- if (connectTimeout > 0) {
- channel.connectDeadlineNanos = System.nanoTime() + connectTimeout * 1000000L;
- }
- }
- }
+
}
diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpClientSocketChannelFactory.java b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpClientSocketChannelFactory.java
index 151e490b64..9fe1b23a3f 100644
--- a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpClientSocketChannelFactory.java
+++ b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpClientSocketChannelFactory.java
@@ -17,7 +17,10 @@ package io.netty.channel.sctp;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelPipeline;
-import io.netty.util.internal.ExecutorUtil;
+import io.netty.channel.ChannelSink;
+import io.netty.channel.socket.nio.SelectorUtil;
+import io.netty.channel.socket.nio.WorkerPool;
+import io.netty.util.ExternalResourceReleasable;
import java.util.concurrent.Executor;
@@ -74,9 +77,8 @@ import java.util.concurrent.Executor;
*/
public class SctpClientSocketChannelFactory implements ChannelFactory {
- private final Executor bossExecutor;
- private final Executor workerExecutor;
- private final SctpClientPipelineSink sink;
+ private final WorkerPool workerPool;
+ private final ChannelSink sink;
/**
* Creates a new instance. Calling this constructor is same with calling
@@ -84,53 +86,45 @@ public class SctpClientSocketChannelFactory implements ChannelFactory {
* the number of available processors in the machine. The number of
* available processors is obtained by {@link Runtime#availableProcessors()}.
*
- * @param bossExecutor
- * the {@link java.util.concurrent.Executor} which will execute the boss thread
* @param workerExecutor
* the {@link java.util.concurrent.Executor} which will execute the I/O worker threads
*/
- public SctpClientSocketChannelFactory(
- Executor bossExecutor, Executor workerExecutor) {
- this(bossExecutor, workerExecutor, SelectorUtil.DEFAULT_IO_THREADS);
+ public SctpClientSocketChannelFactory(Executor workerExecutor) {
+ this(workerExecutor, SelectorUtil.DEFAULT_IO_THREADS);
}
/**
* Creates a new instance.
- *
- * @param bossExecutor
- * the {@link java.util.concurrent.Executor} which will execute the boss thread
+ *
* @param workerExecutor
* the {@link java.util.concurrent.Executor} which will execute the I/O worker threads
* @param workerCount
* the maximum number of I/O worker threads
*/
- public SctpClientSocketChannelFactory(
- Executor bossExecutor, Executor workerExecutor,
+ public SctpClientSocketChannelFactory(Executor workerExecutor,
int workerCount) {
- if (bossExecutor == null) {
- throw new NullPointerException("bossExecutor");
+ this(new SctpWorkerPool(workerExecutor, workerCount, true));
+ }
+
+ public SctpClientSocketChannelFactory(WorkerPool workerPool) {
+ if (workerPool == null) {
+ throw new NullPointerException("workerPool");
}
- if (workerExecutor == null) {
- throw new NullPointerException("workerExecutor");
- }
- if (workerCount <= 0) {
- throw new IllegalArgumentException(
- "workerCount (" + workerCount + ") " +
- "must be a positive integer.");
- }
-
- this.bossExecutor = bossExecutor;
- this.workerExecutor = workerExecutor;
- sink = new SctpClientPipelineSink(bossExecutor, workerExecutor, workerCount);
+
+ this.workerPool = workerPool;
+ sink = new SctpClientPipelineSink();
}
@Override
public SctpChannel newChannel(ChannelPipeline pipeline) {
- return new SctpClientChannel(this, pipeline, sink, sink.nextWorker());
+ return new SctpClientChannel(this, pipeline, sink, workerPool.nextWorker());
}
@Override
public void releaseExternalResources() {
- ExecutorUtil.terminate(bossExecutor, workerExecutor);
+ if (workerPool instanceof ExternalResourceReleasable) {
+ ((ExternalResourceReleasable) workerPool).releaseExternalResources();
+ }
+
}
}
diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpJdkChannel.java b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpJdkChannel.java
new file mode 100644
index 0000000000..33bc3b3b88
--- /dev/null
+++ b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpJdkChannel.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2011 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.netty.channel.sctp;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+
+import com.sun.nio.sctp.SctpChannel;
+
+import io.netty.channel.socket.nio.AbstractJdkChannel;
+
+public class SctpJdkChannel extends AbstractJdkChannel {
+
+ SctpJdkChannel(SctpChannel channel) {
+ super(channel);
+ }
+
+ @Override
+ protected SctpChannel getChannel() {
+ return (SctpChannel) super.getChannel();
+ }
+
+ @Override
+ public InetSocketAddress getRemoteSocketAddress() {
+ try {
+ for (SocketAddress address : getChannel().getRemoteAddresses()) {
+ return (InetSocketAddress) address;
+ }
+ } catch (IOException e) {
+ // ignore
+ }
+ return null;
+ }
+
+ @Override
+ public SocketAddress getLocalSocketAddress() {
+ try {
+ for (SocketAddress address : getChannel().getAllLocalAddresses()) {
+ return (InetSocketAddress) address;
+ }
+ } catch (IOException e) {
+ // ignore
+ }
+ return null;
+ }
+
+ @Override
+ public boolean isConnected() {
+ return getChannel().isOpen();
+ }
+
+ @Override
+ public boolean isSocketBound() {
+ try {
+ return !getChannel().getAllLocalAddresses().isEmpty();
+ } catch (IOException e) {
+ return false;
+ }
+ }
+
+ @Override
+ public void disconnectSocket() throws IOException {
+ closeSocket();
+ }
+
+ @Override
+ public void closeSocket() throws IOException {
+ for (SocketAddress address: getChannel().getAllLocalAddresses()) {
+ getChannel().unbindAddress(((InetSocketAddress) address).getAddress());
+ }
+ }
+
+ @Override
+ public void bind(SocketAddress local) throws IOException {
+ getChannel().bind(local);
+ }
+
+ @Override
+ public void connect(SocketAddress remote) throws IOException {
+ getChannel().connect(remote);
+ }
+
+ @Override
+ public int write(ByteBuffer src) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean finishConnect() throws IOException {
+ return getChannel().finishConnect();
+ }
+
+}
diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpNotificationHandler.java b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpNotificationHandler.java
index 65f2502040..9a4ec240ec 100644
--- a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpNotificationHandler.java
+++ b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpNotificationHandler.java
@@ -59,7 +59,7 @@ class SctpNotificationHandler extends AbstractNotificationHandler