diff --git a/example/src/main/java/io/netty/example/sctp/SctpClient.java b/example/src/main/java/io/netty/example/sctp/SctpClient.java index 3378f65f3a..c093e15906 100644 --- a/example/src/main/java/io/netty/example/sctp/SctpClient.java +++ b/example/src/main/java/io/netty/example/sctp/SctpClient.java @@ -44,7 +44,6 @@ public class SctpClient { // Configure the client. ClientBootstrap bootstrap = new ClientBootstrap( new SctpClientSocketChannelFactory( - Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); final ExecutionHandler executionHandler = new ExecutionHandler( diff --git a/example/src/main/java/io/netty/example/sctp/SctpServer.java b/example/src/main/java/io/netty/example/sctp/SctpServer.java index a45a66e9ff..186e6733e3 100644 --- a/example/src/main/java/io/netty/example/sctp/SctpServer.java +++ b/example/src/main/java/io/netty/example/sctp/SctpServer.java @@ -41,7 +41,6 @@ public class SctpServer { // Configure the server. ServerBootstrap bootstrap = new ServerBootstrap( new SctpServerSocketChannelFactory( - Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); final ExecutionHandler executionHandler = new ExecutionHandler( diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractSctpChannelSink.java b/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractSctpChannelSink.java deleted file mode 100644 index 906fc35c99..0000000000 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractSctpChannelSink.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -package io.netty.channel.sctp; - -import io.netty.channel.AbstractChannelSink; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.socket.ChannelRunnableWrapper; - -public abstract class AbstractSctpChannelSink extends AbstractChannelSink { - - @Override - public ChannelFuture execute(ChannelPipeline pipeline, final Runnable task) { - Channel ch = pipeline.getChannel(); - if (ch instanceof SctpChannelImpl) { - SctpChannelImpl channel = (SctpChannelImpl) ch; - ChannelRunnableWrapper wrapper = new ChannelRunnableWrapper(channel, task); - channel.worker.executeInIoThread(wrapper); - return wrapper; - - } else { - return super.execute(pipeline, task); - } - - } -} diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractWriteRequestQueue.java b/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractWriteRequestQueue.java deleted file mode 100644 index b2b6ce436d..0000000000 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractWriteRequestQueue.java +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.sctp; - -import java.util.Collection; -import java.util.Iterator; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; - -import io.netty.channel.MessageEvent; -import io.netty.util.internal.QueueFactory; - -abstract class AbstractWriteRequestQueue implements BlockingQueue { - - protected final BlockingQueue queue; - - public AbstractWriteRequestQueue() { - this.queue = QueueFactory.createQueue(MessageEvent.class); - } - - @Override - public MessageEvent remove() { - return queue.remove(); - } - - @Override - public MessageEvent element() { - return queue.element(); - } - - @Override - public MessageEvent peek() { - return queue.peek(); - } - - @Override - public int size() { - return queue.size(); - } - - @Override - public boolean isEmpty() { - return queue.isEmpty(); - } - - @Override - public Iterator iterator() { - return queue.iterator(); - } - - @Override - public Object[] toArray() { - return queue.toArray(); - } - - @Override - public T[] toArray(T[] a) { - return queue.toArray(a); - } - - @Override - public boolean containsAll(Collection c) { - return queue.containsAll(c); - } - - @Override - public boolean addAll(Collection c) { - return queue.addAll(c); - } - - @Override - public boolean removeAll(Collection c) { - return queue.removeAll(c); - } - - @Override - public boolean retainAll(Collection c) { - return queue.retainAll(c); - } - - @Override - public void clear() { - queue.clear(); - } - - @Override - public boolean add(MessageEvent e) { - return queue.add(e); - } - - @Override - public void put(MessageEvent e) throws InterruptedException { - queue.put(e); - } - - @Override - public boolean offer(MessageEvent e, long timeout, TimeUnit unit) throws InterruptedException { - return queue.offer(e, timeout, unit); - } - - @Override - public MessageEvent take() throws InterruptedException { - return queue.take(); - } - - @Override - public MessageEvent poll(long timeout, TimeUnit unit) throws InterruptedException { - return queue.poll(timeout, unit); - } - - @Override - public int remainingCapacity() { - return queue.remainingCapacity(); - } - - @Override - public boolean remove(Object o) { - return queue.remove(o); - } - - @Override - public boolean contains(Object o) { - return queue.contains(o); - } - - @Override - public int drainTo(Collection c) { - return queue.drainTo(c); - } - - @Override - public int drainTo(Collection c, int maxElements) { - return queue.drainTo(c, maxElements); - } - -} diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/NioSctpChannelConfig.java b/transport-sctp/src/main/java/io/netty/channel/sctp/NioSctpChannelConfig.java index c2344793b1..dfa1be49c7 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/NioSctpChannelConfig.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/NioSctpChannelConfig.java @@ -17,6 +17,7 @@ package io.netty.channel.sctp; import io.netty.channel.ReceiveBufferSizePredictor; import io.netty.channel.ReceiveBufferSizePredictorFactory; +import io.netty.channel.socket.nio.NioChannelConfig; /** * A {@link io.netty.channel.sctp.SctpChannelConfig} for a NIO SCTP/IP {@link io.netty.channel.sctp.SctpChannel}. @@ -43,48 +44,7 @@ import io.netty.channel.ReceiveBufferSizePredictorFactory; * * */ -public interface NioSctpChannelConfig extends SctpChannelConfig { - - /** - * Returns the high water mark of the write buffer. If the number of bytes - * queued in the write buffer exceeds this value, {@link io.netty.channel.Channel#isWritable()} - * will start to return {@code false}. - */ - int getWriteBufferHighWaterMark(); - - /** - * Sets the high water mark of the write buffer. If the number of bytes - * queued in the write buffer exceeds this value, {@link io.netty.channel.Channel#isWritable()} - * will start to return {@code false}. - */ - void setWriteBufferHighWaterMark(int writeBufferHighWaterMark); - - /** - * Returns the low water mark of the write buffer. Once the number of bytes - * queued in the write buffer exceeded the - * {@linkplain #setWriteBufferHighWaterMark(int) high water mark} and then - * dropped down below this value, {@link io.netty.channel.Channel#isWritable()} will return - * {@code true} again. - */ - int getWriteBufferLowWaterMark(); - - /** - * Sets the low water mark of the write buffer. Once the number of bytes - * queued in the write buffer exceeded the - * {@linkplain #setWriteBufferHighWaterMark(int) high water mark} and then - * dropped down below this value, {@link io.netty.channel.Channel#isWritable()} will return - * {@code true} again. - */ - void setWriteBufferLowWaterMark(int writeBufferLowWaterMark); - - /** - * Returns the maximum loop count for a write operation until - * {@link java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)} returns a non-zero value. - * It is similar to what a spin lock is used for in concurrency programming. - * It improves memory utilization and write throughput depending on - * the platform that JVM runs on. The default value is {@code 16}. - */ - int getWriteSpinCount(); +public interface NioSctpChannelConfig extends SctpChannelConfig, NioChannelConfig { /** * Sets the maximum loop count for a write operation until diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpAcceptedChannel.java b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpAcceptedChannel.java index 7996280614..8e9bca344f 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpAcceptedChannel.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpAcceptedChannel.java @@ -27,17 +27,13 @@ import static io.netty.channel.Channels.*; */ final class SctpAcceptedChannel extends SctpChannelImpl { - final Thread bossThread; - SctpAcceptedChannel( ChannelFactory factory, ChannelPipeline pipeline, Channel parent, ChannelSink sink, - SctpChannel socket, SctpWorker worker, Thread bossThread) { + SctpChannel socket, SctpWorker worker) { super(parent, factory, pipeline, sink, socket, worker); - this.bossThread = bossThread; - setConnected(); fireChannelOpen(this); fireChannelBound(this, getLocalAddress()); diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpChannelImpl.java b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpChannelImpl.java index 2ace041ebe..6c55a1bd83 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpChannelImpl.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpChannelImpl.java @@ -15,22 +15,7 @@ */ package io.netty.channel.sctp; -import static io.netty.channel.Channels.*; - -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import com.sun.nio.sctp.Association; - -import io.netty.channel.AbstractChannel; +import static io.netty.channel.Channels.future; import io.netty.channel.Channel; import io.netty.channel.ChannelFactory; import io.netty.channel.ChannelFuture; @@ -38,12 +23,22 @@ import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelSink; import io.netty.channel.MessageEvent; -import io.netty.channel.sctp.SctpSendBufferPool.SendBuffer; -import io.netty.util.internal.ThreadLocalBoolean; +import io.netty.channel.sctp.SctpSendBufferPool.SctpSendBuffer; +import io.netty.channel.socket.nio.AbstractNioChannel; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.Collections; +import java.util.HashSet; +import java.util.Queue; +import java.util.Set; + +import com.sun.nio.sctp.Association; /** */ -class SctpChannelImpl extends AbstractChannel implements SctpChannel { +class SctpChannelImpl extends AbstractNioChannel implements SctpChannel { private static final int ST_OPEN = 0; private static final int ST_BOUND = 1; @@ -51,35 +46,14 @@ class SctpChannelImpl extends AbstractChannel implements SctpChannel { private static final int ST_CLOSED = -1; volatile int state = ST_OPEN; - final com.sun.nio.sctp.SctpChannel channel; - final SctpWorker worker; private final NioSctpChannelConfig config; - private volatile InetSocketAddress localAddress; - private volatile InetSocketAddress remoteAddress; - - final Object interestOpsLock = new Object(); - final Object writeLock = new Object(); - - final Runnable writeTask = new WriteTask(); - final AtomicBoolean writeTaskInTaskQueue = new AtomicBoolean(); - - final Queue writeBuffer = new WriteRequestQueue(); - final AtomicInteger writeBufferSize = new AtomicInteger(); - final AtomicInteger highWaterMarkCounter = new AtomicInteger(); - boolean inWriteNowLoop; - boolean writeSuspended; - - MessageEvent currentWriteEvent; - SendBuffer currentWriteBuffer; final SctpNotificationHandler notificationHandler = new SctpNotificationHandler(this); public SctpChannelImpl(Channel parent, ChannelFactory factory, ChannelPipeline pipeline, ChannelSink sink, com.sun.nio.sctp.SctpChannel channel, SctpWorker worker) { - super(parent, factory, pipeline, sink); + super(parent, factory, pipeline, sink, worker, new SctpJdkChannel(channel)); - this.channel = channel; - this.worker = worker; config = new DefaultNioSctpChannelConfig(channel); getCloseFuture().addListener(new ChannelFutureListener() { @@ -90,31 +64,76 @@ class SctpChannelImpl extends AbstractChannel implements SctpChannel { }); } + Queue getWriteBufferQueue() { + return writeBufferQueue; + } + + Object getWriteLock() { + return writeLock; + } + + Object getInterestedOpsLock() { + return interestOpsLock; + } + + + void setWriteSuspended(boolean writeSuspended) { + this.writeSuspended = writeSuspended; + } + + boolean getWriteSuspended() { + return writeSuspended; + } + + void setInWriteNowLoop(boolean inWriteNowLoop) { + this.inWriteNowLoop = inWriteNowLoop; + } + + MessageEvent getCurrentWriteEvent() { + return currentWriteEvent; + } + + void setCurrentWriteEvent(MessageEvent currentWriteEvent) { + this.currentWriteEvent = currentWriteEvent; + } + + int getRawInterestOps() { + return super.getInterestOps(); + } + + void setRawInterestOpsNow(int interestOps) { + super.setInterestOpsNow(interestOps); + } + + SctpSendBuffer getCurrentWriteBuffer() { + return (SctpSendBuffer) currentWriteBuffer; + } + + void setCurrentWriteBuffer(SctpSendBuffer currentWriteBuffer) { + this.currentWriteBuffer = currentWriteBuffer; + } + + @Override + public SctpWorker getWorker() { + return (SctpWorker) super.getWorker(); + } + + @Override public NioSctpChannelConfig getConfig() { return config; } @Override - public InetSocketAddress getLocalAddress() { - InetSocketAddress localAddress = this.localAddress; - if (localAddress == null) { - try { - final Iterator iterator = channel.getAllLocalAddresses().iterator(); - if (iterator.hasNext()) { - this.localAddress = localAddress = (InetSocketAddress) iterator.next(); - } - } catch (Throwable t) { - return null; - } - } - return localAddress; + public SctpJdkChannel getJdkChannel() { + return (SctpJdkChannel) super.getJdkChannel(); } + @Override public Set getAllLocalAddresses() { try { - final Set allLocalAddresses = channel.getAllLocalAddresses(); + final Set allLocalAddresses = getJdkChannel().getChannel().getAllLocalAddresses(); final Set addresses = new HashSet(allLocalAddresses.size()); for (SocketAddress socketAddress: allLocalAddresses) { addresses.add((InetSocketAddress) socketAddress); @@ -125,26 +144,10 @@ class SctpChannelImpl extends AbstractChannel implements SctpChannel { } } - @Override - public InetSocketAddress getRemoteAddress() { - InetSocketAddress remoteAddress = this.remoteAddress; - if (remoteAddress == null) { - try { - final Iterator iterator = channel.getRemoteAddresses().iterator(); - if (iterator.hasNext()) { - this.remoteAddress = remoteAddress = (InetSocketAddress) iterator.next(); - } - } catch (Throwable t) { - return null; - } - } - return remoteAddress; - } - @Override public Set getAllRemoteAddresses() { try { - final Set allLocalAddresses = channel.getRemoteAddresses(); + final Set allLocalAddresses = getJdkChannel().getChannel().getRemoteAddresses(); final Set addresses = new HashSet(allLocalAddresses.size()); for (SocketAddress socketAddress: allLocalAddresses) { addresses.add((InetSocketAddress) socketAddress); @@ -172,7 +175,7 @@ class SctpChannelImpl extends AbstractChannel implements SctpChannel { @Override public Association association() { try { - return channel.association(); + return getJdkChannel().getChannel().association(); } catch (Throwable e) { return null; } @@ -198,7 +201,7 @@ class SctpChannelImpl extends AbstractChannel implements SctpChannel { state = ST_BOUND; } - final void setConnected() { + protected final void setConnected() { if (state != ST_CLOSED) { state = ST_CONNECTED; } @@ -208,126 +211,20 @@ class SctpChannelImpl extends AbstractChannel implements SctpChannel { protected boolean setClosed() { return super.setClosed(); } - + @Override - public int getInterestOps() { - if (!isOpen()) { - return Channel.OP_WRITE; - } - - int interestOps = getRawInterestOps(); - int writeBufferSize = this.writeBufferSize.get(); - if (writeBufferSize != 0) { - if (highWaterMarkCounter.get() > 0) { - int lowWaterMark = getConfig().getWriteBufferLowWaterMark(); - if (writeBufferSize >= lowWaterMark) { - interestOps |= Channel.OP_WRITE; - } else { - interestOps &= ~Channel.OP_WRITE; - } - } else { - int highWaterMark = getConfig().getWriteBufferHighWaterMark(); - if (writeBufferSize >= highWaterMark) { - interestOps |= Channel.OP_WRITE; - } else { - interestOps &= ~Channel.OP_WRITE; + protected WriteRequestQueue createRequestQueue() { + return new WriteRequestQueue() { + + @Override + protected int getMessageSize(MessageEvent e) { + Object m = e.getMessage(); + if (m instanceof SctpFrame) { + return ((SctpFrame) m).getPayloadBuffer().readableBytes(); } + return 0; } - } else { - interestOps &= ~Channel.OP_WRITE; - } - - return interestOps; + }; } - int getRawInterestOps() { - return super.getInterestOps(); - } - - void setRawInterestOpsNow(int interestOps) { - super.setInterestOpsNow(interestOps); - } - - @Override - public ChannelFuture write(Object message, SocketAddress remoteAddress) { - if (remoteAddress == null || remoteAddress.equals(getRemoteAddress())) { - return super.write(message, null); - } else { - return getUnsupportedOperationFuture(); - } - } - - private final class WriteRequestQueue extends AbstractWriteRequestQueue { - - private final ThreadLocalBoolean notifying = new ThreadLocalBoolean(); - - WriteRequestQueue() { - super(); - } - - @Override - public boolean offer(MessageEvent e) { - boolean success = queue.offer(e); - assert success; - - int messageSize = getMessageSize(e); - int newWriteBufferSize = writeBufferSize.addAndGet(messageSize); - int highWaterMark = getConfig().getWriteBufferHighWaterMark(); - - if (newWriteBufferSize >= highWaterMark) { - if (newWriteBufferSize - messageSize < highWaterMark) { - highWaterMarkCounter.incrementAndGet(); - if (!notifying.get()) { - notifying.set(Boolean.TRUE); - fireChannelInterestChanged(SctpChannelImpl.this); - notifying.set(Boolean.FALSE); - } - } - } - return true; - } - - @Override - public MessageEvent poll() { - MessageEvent e = queue.poll(); - if (e != null) { - int messageSize = getMessageSize(e); - int newWriteBufferSize = writeBufferSize.addAndGet(-messageSize); - int lowWaterMark = getConfig().getWriteBufferLowWaterMark(); - - if (newWriteBufferSize == 0 || newWriteBufferSize < lowWaterMark) { - if (newWriteBufferSize + messageSize >= lowWaterMark) { - highWaterMarkCounter.decrementAndGet(); - if (isConnected() && !notifying.get()) { - notifying.set(Boolean.TRUE); - fireChannelInterestChanged(SctpChannelImpl.this); - notifying.set(Boolean.FALSE); - } - } - } - } - return e; - } - - private int getMessageSize(MessageEvent e) { - Object m = e.getMessage(); - if (m instanceof SctpFrame) { - return ((SctpFrame) m).getPayloadBuffer().readableBytes(); - } - return 0; - } - } - - private final class WriteTask implements Runnable { - - WriteTask() { - super(); - } - - @Override - public void run() { - writeTaskInTaskQueue.set(false); - worker.writeFromTaskLoop(SctpChannelImpl.this); - } - } } diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpClientPipelineSink.java b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpClientPipelineSink.java index 1e664a0120..9bcc85f908 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpClientPipelineSink.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpClientPipelineSink.java @@ -15,56 +15,25 @@ */ package io.netty.channel.sctp; -import static io.netty.channel.Channels.*; - -import java.io.IOException; -import java.net.ConnectException; -import java.net.InetAddress; -import java.net.SocketAddress; -import java.nio.channels.ClosedChannelException; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.util.Iterator; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - +import static io.netty.channel.Channels.fireChannelBound; +import static io.netty.channel.Channels.fireExceptionCaught; +import static io.netty.channel.Channels.succeededFuture; import io.netty.channel.ChannelEvent; -import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelState; import io.netty.channel.ChannelStateEvent; import io.netty.channel.MessageEvent; -import io.netty.logging.InternalLogger; -import io.netty.logging.InternalLoggerFactory; -import io.netty.util.internal.DeadLockProofWorker; -import io.netty.util.internal.QueueFactory; +import io.netty.channel.socket.nio.AbstractNioChannelSink; + +import java.net.InetAddress; +import java.net.SocketAddress; +import java.nio.channels.ClosedChannelException; /** */ -class SctpClientPipelineSink extends AbstractSctpChannelSink { - - static final InternalLogger logger = - InternalLoggerFactory.getInstance(SctpClientPipelineSink.class); - - final Executor bossExecutor; - private final Boss boss = new Boss(); - private final SctpWorker[] workers; - private final AtomicInteger workerIndex = new AtomicInteger(); - - SctpClientPipelineSink( - Executor bossExecutor, Executor workerExecutor, int workerCount) { - this.bossExecutor = bossExecutor; - workers = new SctpWorker[workerCount]; - for (int i = 0; i < workers.length; i ++) { - workers[i] = new SctpWorker(workerExecutor); - } - } +class SctpClientPipelineSink extends AbstractNioChannelSink { @Override public void eventSunk( @@ -80,21 +49,21 @@ class SctpClientPipelineSink extends AbstractSctpChannelSink { switch (state) { case OPEN: if (Boolean.FALSE.equals(value)) { - channel.worker.close(channel, future); + channel.getWorker().close(channel, future); } break; case BOUND: if (value != null) { bind(channel, future, (SocketAddress) value); } else { - channel.worker.close(channel, future); + channel.getWorker().close(channel, future); } break; case CONNECTED: if (value != null) { connect(channel, future, (SocketAddress) value); } else { - channel.worker.close(channel, future); + channel.getWorker().close(channel, future); } break; case INTEREST_OPS: @@ -105,16 +74,16 @@ class SctpClientPipelineSink extends AbstractSctpChannelSink { SctpUnbindAddressEvent unbindAddressEvent = (SctpUnbindAddressEvent) event; unbindAddress(channel, unbindAddressEvent.getFuture(), unbindAddressEvent.getValue()); } else { - channel.worker.setInterestOps(channel, future, ((Integer) value).intValue()); + channel.getWorker().setInterestOps(channel, future, ((Integer) value).intValue()); } break; } } else if (e instanceof MessageEvent) { MessageEvent event = (MessageEvent) e; SctpChannelImpl channel = (SctpChannelImpl) event.getChannel(); - boolean offered = channel.writeBuffer.offer(event); + boolean offered = channel.getWriteBufferQueue().offer(event); assert offered; - channel.worker.writeFromUserCode(channel); + channel.getWorker().writeFromUserCode(channel); } } @@ -122,7 +91,7 @@ class SctpClientPipelineSink extends AbstractSctpChannelSink { SctpClientChannel channel, ChannelFuture future, SocketAddress localAddress) { try { - channel.channel.bind(localAddress); + channel.getJdkChannel().bind(localAddress); channel.boundManually = true; channel.setBound(); future.setSuccess(); @@ -137,7 +106,7 @@ class SctpClientPipelineSink extends AbstractSctpChannelSink { SctpClientChannel channel, ChannelFuture future, InetAddress localAddress) { try { - channel.channel.bindAddress(localAddress); + channel.getJdkChannel().getChannel().bindAddress(localAddress); future.setSuccess(); } catch (Throwable t) { future.setFailure(t); @@ -149,7 +118,7 @@ class SctpClientPipelineSink extends AbstractSctpChannelSink { SctpClientChannel channel, ChannelFuture future, InetAddress localAddress) { try { - channel.channel.unbindAddress(localAddress); + channel.getJdkChannel().getChannel().unbindAddress(localAddress); future.setSuccess(); } catch (Throwable t) { future.setFailure(t); @@ -163,293 +132,28 @@ class SctpClientPipelineSink extends AbstractSctpChannelSink { final SctpClientChannel channel, final ChannelFuture cf, SocketAddress remoteAddress) { try { - if (channel.channel.connect(remoteAddress)) { - channel.worker.register(channel, cf); - } else { - channel.getCloseFuture().addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture f) - throws Exception { - if (!cf.isDone()) { - cf.setFailure(new ClosedChannelException()); - } + channel.getJdkChannel().connect(remoteAddress); + + channel.getCloseFuture().addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture f) + throws Exception { + if (!cf.isDone()) { + cf.setFailure(new ClosedChannelException()); } - }); - cf.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); - channel.connectFuture = cf; - boss.register(channel); - } + } + }); + cf.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); + channel.connectFuture = cf; + channel.getWorker().registerWithWorker(channel, cf); + } catch (Throwable t) { cf.setFailure(t); fireExceptionCaught(channel, t); - channel.worker.close(channel, succeededFuture(channel)); + channel.getWorker().close(channel, succeededFuture(channel)); } } - SctpWorker nextWorker() { - return workers[Math.abs( - workerIndex.getAndIncrement() % workers.length)]; - } - - private final class Boss implements Runnable { - - volatile Selector selector; - private boolean started; - private final AtomicBoolean wakenUp = new AtomicBoolean(); - private final Object startStopLock = new Object(); - private final Queue registerTaskQueue = QueueFactory.createQueue(Runnable.class); - - Boss() { - super(); - } - - void register(SctpClientChannel channel) { - Runnable registerTask = new RegisterTask(this, channel); - Selector selector; - - synchronized (startStopLock) { - if (!started) { - // Open a selector if this worker didn't start yet. - try { - this.selector = selector = Selector.open(); - } catch (Throwable t) { - throw new ChannelException( - "Failed to create a selector.", t); - } - - // Start the worker thread with the new Selector. - boolean success = false; - try { - DeadLockProofWorker.start(bossExecutor, this); - success = true; - } finally { - if (!success) { - // Release the Selector if the execution fails. - try { - selector.close(); - } catch (Throwable t) { - logger.warn("Failed to close a selector.", t); - } - this.selector = selector = null; - // The method will return to the caller at this point. - } - } - } else { - // Use the existing selector if this worker has been started. - selector = this.selector; - } - - assert selector != null && selector.isOpen(); - - started = true; - boolean offered = registerTaskQueue.offer(registerTask); - assert offered; - } - - if (wakenUp.compareAndSet(false, true)) { - selector.wakeup(); - } - } - - @Override - public void run() { - boolean shutdown = false; - Selector selector = this.selector; - long lastConnectTimeoutCheckTimeNanos = System.nanoTime(); - for (;;) { - wakenUp.set(false); - - try { - int selectedKeyCount = selector.select(10); - - // 'wakenUp.compareAndSet(false, true)' is always evaluated - // before calling 'selector.wakeup()' to reduce the wake-up - // overhead. (Selector.wakeup() is an expensive operation.) - // - // However, there is a race condition in this approach. - // The race condition is triggered when 'wakenUp' is set to - // true too early. - // - // 'wakenUp' is set to true too early if: - // 1) Selector is waken up between 'wakenUp.set(false)' and - // 'selector.select(...)'. (BAD) - // 2) Selector is waken up between 'selector.select(...)' and - // 'if (wakenUp.get()) { ... }'. (OK) - // - // In the first case, 'wakenUp' is set to true and the - // following 'selector.select(...)' will wake up immediately. - // Until 'wakenUp' is set to false again in the next round, - // 'wakenUp.compareAndSet(false, true)' will fail, and therefore - // any attempt to wake up the Selector will fail, too, causing - // the following 'selector.select(...)' call to block - // unnecessarily. - // - // To fix this problem, we wake up the selector again if wakenUp - // is true immediately after selector.select(...). - // It is inefficient in that it wakes up the selector for both - // the first case (BAD - wake-up required) and the second case - // (OK - no wake-up required). - - if (wakenUp.get()) { - selector.wakeup(); - } - - processRegisterTaskQueue(); - - if (selectedKeyCount > 0) { - processSelectedKeys(selector.selectedKeys()); - } - - // Handle connection timeout every 10 milliseconds approximately. - long currentTimeNanos = System.nanoTime(); - if (currentTimeNanos - lastConnectTimeoutCheckTimeNanos >= 10 * 1000000L) { - lastConnectTimeoutCheckTimeNanos = currentTimeNanos; - processConnectTimeout(selector.keys(), currentTimeNanos); - } - - // Exit the loop when there's nothing to handle. - // The shutdown flag is used to delay the shutdown of this - // loop to avoid excessive Selector creation when - // connection attempts are made in a one-by-one manner - // instead of concurrent manner. - if (selector.keys().isEmpty()) { - if (shutdown || - bossExecutor instanceof ExecutorService && ((ExecutorService) bossExecutor).isShutdown()) { - - synchronized (startStopLock) { - if (registerTaskQueue.isEmpty() && selector.keys().isEmpty()) { - started = false; - try { - selector.close(); - } catch (IOException e) { - if (logger.isWarnEnabled()) { - logger.warn( - "Failed to close a selector.", e); - } - } finally { - this.selector = null; - } - break; - } else { - shutdown = false; - } - } - } else { - // Give one more second. - shutdown = true; - } - } else { - shutdown = false; - } - } catch (Throwable t) { - if (logger.isWarnEnabled()) { - logger.warn( - "Unexpected exception in the selector loop.", t); - } - - // Prevent possible consecutive immediate failures. - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - // Ignore. - } - } - } - } - - private void processRegisterTaskQueue() { - for (;;) { - final Runnable task = registerTaskQueue.poll(); - if (task == null) { - break; - } - - task.run(); - } - } - - private void processSelectedKeys(Set selectedKeys) { - for (Iterator i = selectedKeys.iterator(); i.hasNext();) { - SelectionKey k = i.next(); - i.remove(); - - if (!k.isValid()) { - close(k); - continue; - } - - if (k.isConnectable()) { - connect(k); - } - } - } - - private void processConnectTimeout(Set keys, long currentTimeNanos) { - ConnectException cause = null; - for (SelectionKey k: keys) { - if (!k.isValid()) { - continue; - } - - SctpClientChannel ch = (SctpClientChannel) k.attachment(); - if (ch.connectDeadlineNanos > 0 && - currentTimeNanos >= ch.connectDeadlineNanos) { - - if (cause == null) { - cause = new ConnectException("connection timed out"); - } - - ch.connectFuture.setFailure(cause); - fireExceptionCaught(ch, cause); - ch.worker.close(ch, succeededFuture(ch)); - } - } - } - - private void connect(SelectionKey k) { - SctpClientChannel ch = (SctpClientChannel) k.attachment(); - try { - if (ch.channel.finishConnect()) { - k.cancel(); - ch.worker.register(ch, ch.connectFuture); - } - } catch (Throwable t) { - ch.connectFuture.setFailure(t); - fireExceptionCaught(ch, t); - k.cancel(); // Some JDK implementations run into an infinite loop without this. - ch.worker.close(ch, succeededFuture(ch)); - } - } - - private void close(SelectionKey k) { - SctpClientChannel ch = (SctpClientChannel) k.attachment(); - ch.worker.close(ch, succeededFuture(ch)); - } - } - - private static final class RegisterTask implements Runnable { - private final Boss boss; - private final SctpClientChannel channel; - - RegisterTask(Boss boss, SctpClientChannel channel) { - this.boss = boss; - this.channel = channel; - } - - @Override - public void run() { - try { - channel.channel.register( - boss.selector, SelectionKey.OP_CONNECT, channel); - } catch (ClosedChannelException e) { - channel.worker.close(channel, succeededFuture(channel)); - } - - int connectTimeout = channel.getConfig().getConnectTimeoutMillis(); - if (connectTimeout > 0) { - channel.connectDeadlineNanos = System.nanoTime() + connectTimeout * 1000000L; - } - } - } + } diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpClientSocketChannelFactory.java b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpClientSocketChannelFactory.java index 151e490b64..9fe1b23a3f 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpClientSocketChannelFactory.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpClientSocketChannelFactory.java @@ -17,7 +17,10 @@ package io.netty.channel.sctp; import io.netty.channel.ChannelFactory; import io.netty.channel.ChannelPipeline; -import io.netty.util.internal.ExecutorUtil; +import io.netty.channel.ChannelSink; +import io.netty.channel.socket.nio.SelectorUtil; +import io.netty.channel.socket.nio.WorkerPool; +import io.netty.util.ExternalResourceReleasable; import java.util.concurrent.Executor; @@ -74,9 +77,8 @@ import java.util.concurrent.Executor; */ public class SctpClientSocketChannelFactory implements ChannelFactory { - private final Executor bossExecutor; - private final Executor workerExecutor; - private final SctpClientPipelineSink sink; + private final WorkerPool workerPool; + private final ChannelSink sink; /** * Creates a new instance. Calling this constructor is same with calling @@ -84,53 +86,45 @@ public class SctpClientSocketChannelFactory implements ChannelFactory { * the number of available processors in the machine. The number of * available processors is obtained by {@link Runtime#availableProcessors()}. * - * @param bossExecutor - * the {@link java.util.concurrent.Executor} which will execute the boss thread * @param workerExecutor * the {@link java.util.concurrent.Executor} which will execute the I/O worker threads */ - public SctpClientSocketChannelFactory( - Executor bossExecutor, Executor workerExecutor) { - this(bossExecutor, workerExecutor, SelectorUtil.DEFAULT_IO_THREADS); + public SctpClientSocketChannelFactory(Executor workerExecutor) { + this(workerExecutor, SelectorUtil.DEFAULT_IO_THREADS); } /** * Creates a new instance. - * - * @param bossExecutor - * the {@link java.util.concurrent.Executor} which will execute the boss thread + * * @param workerExecutor * the {@link java.util.concurrent.Executor} which will execute the I/O worker threads * @param workerCount * the maximum number of I/O worker threads */ - public SctpClientSocketChannelFactory( - Executor bossExecutor, Executor workerExecutor, + public SctpClientSocketChannelFactory(Executor workerExecutor, int workerCount) { - if (bossExecutor == null) { - throw new NullPointerException("bossExecutor"); + this(new SctpWorkerPool(workerExecutor, workerCount, true)); + } + + public SctpClientSocketChannelFactory(WorkerPool workerPool) { + if (workerPool == null) { + throw new NullPointerException("workerPool"); } - if (workerExecutor == null) { - throw new NullPointerException("workerExecutor"); - } - if (workerCount <= 0) { - throw new IllegalArgumentException( - "workerCount (" + workerCount + ") " + - "must be a positive integer."); - } - - this.bossExecutor = bossExecutor; - this.workerExecutor = workerExecutor; - sink = new SctpClientPipelineSink(bossExecutor, workerExecutor, workerCount); + + this.workerPool = workerPool; + sink = new SctpClientPipelineSink(); } @Override public SctpChannel newChannel(ChannelPipeline pipeline) { - return new SctpClientChannel(this, pipeline, sink, sink.nextWorker()); + return new SctpClientChannel(this, pipeline, sink, workerPool.nextWorker()); } @Override public void releaseExternalResources() { - ExecutorUtil.terminate(bossExecutor, workerExecutor); + if (workerPool instanceof ExternalResourceReleasable) { + ((ExternalResourceReleasable) workerPool).releaseExternalResources(); + } + } } diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpJdkChannel.java b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpJdkChannel.java new file mode 100644 index 0000000000..33bc3b3b88 --- /dev/null +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpJdkChannel.java @@ -0,0 +1,109 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.channel.sctp; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; + +import com.sun.nio.sctp.SctpChannel; + +import io.netty.channel.socket.nio.AbstractJdkChannel; + +public class SctpJdkChannel extends AbstractJdkChannel { + + SctpJdkChannel(SctpChannel channel) { + super(channel); + } + + @Override + protected SctpChannel getChannel() { + return (SctpChannel) super.getChannel(); + } + + @Override + public InetSocketAddress getRemoteSocketAddress() { + try { + for (SocketAddress address : getChannel().getRemoteAddresses()) { + return (InetSocketAddress) address; + } + } catch (IOException e) { + // ignore + } + return null; + } + + @Override + public SocketAddress getLocalSocketAddress() { + try { + for (SocketAddress address : getChannel().getAllLocalAddresses()) { + return (InetSocketAddress) address; + } + } catch (IOException e) { + // ignore + } + return null; + } + + @Override + public boolean isConnected() { + return getChannel().isOpen(); + } + + @Override + public boolean isSocketBound() { + try { + return !getChannel().getAllLocalAddresses().isEmpty(); + } catch (IOException e) { + return false; + } + } + + @Override + public void disconnectSocket() throws IOException { + closeSocket(); + } + + @Override + public void closeSocket() throws IOException { + for (SocketAddress address: getChannel().getAllLocalAddresses()) { + getChannel().unbindAddress(((InetSocketAddress) address).getAddress()); + } + } + + @Override + public void bind(SocketAddress local) throws IOException { + getChannel().bind(local); + } + + @Override + public void connect(SocketAddress remote) throws IOException { + getChannel().connect(remote); + } + + @Override + public int write(ByteBuffer src) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean finishConnect() throws IOException { + return getChannel().finishConnect(); + } + +} diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpNotificationHandler.java b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpNotificationHandler.java index 65f2502040..9a4ec240ec 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpNotificationHandler.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpNotificationHandler.java @@ -59,7 +59,7 @@ class SctpNotificationHandler extends AbstractNotificationHandler { @Override public HandlerResult handleNotification(ShutdownNotification notification, Object o) { - sctpChannel.worker.close(sctpChannel, Channels.succeededFuture(sctpChannel)); + sctpChannel.getWorker().close(sctpChannel, Channels.succeededFuture(sctpChannel)); return HandlerResult.RETURN; } diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpProviderMetadata.java b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpProviderMetadata.java deleted file mode 100644 index cafe5c30b4..0000000000 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpProviderMetadata.java +++ /dev/null @@ -1,449 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.sctp; - -import com.sun.nio.sctp.SctpServerChannel; -import io.netty.logging.InternalLogger; -import io.netty.logging.InternalLoggerFactory; -import io.netty.util.internal.SystemPropertyUtil; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.spi.SelectorProvider; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -/** - * Provides information which is specific to a NIO service provider - * implementation. - */ -final class SctpProviderMetadata { - static final InternalLogger logger = - InternalLoggerFactory.getInstance(SctpProviderMetadata.class); - - private static final String CONSTRAINT_LEVEL_PROPERTY = - "io.netty.channel.sctp.constraintLevel"; - - /** - * 0 - no need to wake up to get / set interestOps (most cases) - * 1 - no need to wake up to get interestOps, but need to wake up to set. - * 2 - need to wake up to get / set interestOps (old providers) - */ - static final int CONSTRAINT_LEVEL; - - static { - int constraintLevel = -1; - - // Use the system property if possible. - constraintLevel = SystemPropertyUtil.get(CONSTRAINT_LEVEL_PROPERTY, -1); - if (constraintLevel < 0 || constraintLevel > 2) { - constraintLevel = -1; - } - - if (constraintLevel >= 0) { - if (logger.isDebugEnabled()) { - logger.debug( - "Setting the NIO constraint level to: " + constraintLevel); - } - } - - if (constraintLevel < 0) { - constraintLevel = detectConstraintLevelFromSystemProperties(); - - if (constraintLevel < 0) { - constraintLevel = 2; - if (logger.isDebugEnabled()) { - logger.debug( - "Couldn't determine the NIO constraint level from " + - "the system properties; using the safest level (2)"); - } - } else if (constraintLevel != 0) { - if (logger.isInfoEnabled()) { - logger.info( - "Using the autodetected NIO constraint level: " + - constraintLevel + - " (Use better NIO provider for better performance)"); - } - } else { - if (logger.isDebugEnabled()) { - logger.debug( - "Using the autodetected NIO constraint level: " + - constraintLevel); - } - } - } - - CONSTRAINT_LEVEL = constraintLevel; - - if (CONSTRAINT_LEVEL < 0 || CONSTRAINT_LEVEL > 2) { - throw new Error( - "Unexpected NIO constraint level: " + - CONSTRAINT_LEVEL + ", please report this error."); - } - } - - private static int detectConstraintLevelFromSystemProperties() { - String version = SystemPropertyUtil.get("java.specification.version"); - String vminfo = SystemPropertyUtil.get("java.vm.info", ""); - String os = SystemPropertyUtil.get("os.name"); - String vendor = SystemPropertyUtil.get("java.vm.vendor"); - String provider; - try { - provider = SelectorProvider.provider().getClass().getName(); - } catch (Exception e) { - // Perhaps security exception. - provider = null; - } - - if (version == null || os == null || vendor == null || provider == null) { - return -1; - } - - os = os.toLowerCase(); - vendor = vendor.toLowerCase(); - -// System.out.println(version); -// System.out.println(vminfo); -// System.out.println(os); -// System.out.println(vendor); -// System.out.println(provider); - - // Sun JVM - if (vendor.indexOf("sun") >= 0) { - // Linux - if (os.indexOf("linux") >= 0) { - if (provider.equals("sun.nio.ch.EPollSelectorProvider") || - provider.equals("sun.nio.ch.PollSelectorProvider")) { - return 0; - } - - // Windows - } else if (os.indexOf("windows") >= 0) { - if (provider.equals("sun.nio.ch.WindowsSelectorProvider")) { - return 0; - } - - // Solaris - } else if (os.indexOf("sun") >= 0 || os.indexOf("solaris") >= 0) { - if (provider.equals("sun.nio.ch.DevPollSelectorProvider")) { - return 0; - } - } - // Apple JVM - } else if (vendor.indexOf("apple") >= 0) { - // Mac OS - if (os.indexOf("mac") >= 0 && os.indexOf("os") >= 0) { - if (provider.equals("sun.nio.ch.KQueueSelectorProvider")) { - return 0; - } - } - // IBM - } else if (vendor.indexOf("ibm") >= 0) { - // Linux or AIX - if (os.indexOf("linux") >= 0 || os.indexOf("aix") >= 0) { - if (version.equals("1.5") || version.matches("^1\\.5\\D.*$")) { - if (provider.equals("sun.nio.ch.PollSelectorProvider")) { - return 1; - } - } else if (version.equals("1.6") || version.matches("^1\\.6\\D.*$")) { - // IBM JDK 1.6 has different constraint level for different - // version. The exact version can be determined only by its - // build date. - Pattern datePattern = Pattern.compile( - "(?:^|[^0-9])(" + - "[2-9][0-9]{3}" + // year - "(?:0[1-9]|1[0-2])" + // month - "(?:0[1-9]|[12][0-9]|3[01])" + // day of month - ")(?:$|[^0-9])"); - - Matcher dateMatcher = datePattern.matcher(vminfo); - if (dateMatcher.find()) { - long dateValue = Long.parseLong(dateMatcher.group(1)); - if (dateValue < 20081105L) { - // SR0, 1, and 2 - return 2; - } else { - // SR3 and later - if (provider.equals("sun.nio.ch.EPollSelectorProvider")) { - return 0; - } else if (provider.equals("sun.nio.ch.PollSelectorProvider")) { - return 1; - } - } - } - } - } - // BEA - } else if (vendor.indexOf("bea") >= 0 || vendor.indexOf("oracle") >= 0) { - // Linux - if (os.indexOf("linux") >= 0) { - if (provider.equals("sun.nio.ch.EPollSelectorProvider") || - provider.equals("sun.nio.ch.PollSelectorProvider")) { - return 0; - } - - // Windows - } else if (os.indexOf("windows") >= 0) { - if (provider.equals("sun.nio.ch.WindowsSelectorProvider")) { - return 0; - } - } - // Apache Software Foundation - } else if (vendor.indexOf("apache") >= 0) { - if (provider.equals("org.apache.harmony.nio.internal.SelectorProviderImpl")) { - return 1; - } - } - - // Others (untested) - return -1; - } - - private static final class ConstraintLevelAutodetector { - - ConstraintLevelAutodetector() { - super(); - } - - int autodetect() { - final int constraintLevel; - ExecutorService executor = Executors.newCachedThreadPool(); - boolean success; - long startTime; - int interestOps; - - SctpServerChannel ch = null; - SelectorLoop loop = null; - - try { - // Open a channel. - ch = com.sun.nio.sctp.SctpServerChannel.open(); - - // Configure the channel - try { - ch.bind(new InetSocketAddress(0)); - ch.configureBlocking(false); - } catch (Throwable e) { - if (logger.isWarnEnabled()) { - logger.warn("Failed to configure a temporary socket.", e); - } - return -1; - } - - // Prepare the selector loop. - try { - loop = new SelectorLoop(); - } catch (Throwable e) { - if (logger.isWarnEnabled()) { - logger.warn("Failed to open a temporary selector.", e); - } - return -1; - } - - // Register the channel - try { - ch.register(loop.selector, 0); - } catch (Throwable e) { - if (logger.isWarnEnabled()) { - logger.warn("Failed to register a temporary selector.", e); - } - return -1; - } - - SelectionKey key = ch.keyFor(loop.selector); - - // Start the selector loop. - executor.execute(loop); - - // Level 0 - success = true; - for (int i = 0; i < 10; i ++) { - - // Increase the probability of calling interestOps - // while select() is running. - do { - while (!loop.selecting) { - Thread.yield(); - } - - // Wait a little bit more. - try { - Thread.sleep(50); - } catch (InterruptedException e) { - // Ignore - } - } while (!loop.selecting); - - startTime = System.nanoTime(); - key.interestOps(key.interestOps() | SelectionKey.OP_ACCEPT); - key.interestOps(key.interestOps() & ~SelectionKey.OP_ACCEPT); - - if (System.nanoTime() - startTime >= 500000000L) { - success = false; - break; - } - } - - if (success) { - constraintLevel = 0; - } else { - // Level 1 - success = true; - for (int i = 0; i < 10; i ++) { - - // Increase the probability of calling interestOps - // while select() is running. - do { - while (!loop.selecting) { - Thread.yield(); - } - - // Wait a little bit more. - try { - Thread.sleep(50); - } catch (InterruptedException e) { - // Ignore - } - } while (!loop.selecting); - - startTime = System.nanoTime(); - interestOps = key.interestOps(); - synchronized (loop) { - loop.selector.wakeup(); - key.interestOps(interestOps | SelectionKey.OP_ACCEPT); - key.interestOps(interestOps & ~SelectionKey.OP_ACCEPT); - } - - if (System.nanoTime() - startTime >= 500000000L) { - success = false; - break; - } - } - if (success) { - constraintLevel = 1; - } else { - constraintLevel = 2; - } - } - } catch (Throwable e) { - return -1; - } finally { - if (ch != null) { - try { - ch.close(); - } catch (Throwable e) { - if (logger.isWarnEnabled()) { - logger.warn("Failed to close a temporary socket.", e); - } - } - } - - if (loop != null) { - loop.done = true; - try { - executor.shutdownNow(); - } catch (NullPointerException ex) { - // Some JDK throws NPE here, but shouldn't. - } - - try { - for (;;) { - loop.selector.wakeup(); - try { - if (executor.awaitTermination(1, TimeUnit.SECONDS)) { - break; - } - } catch (InterruptedException e) { - // Ignore - } - } - } catch (Throwable e) { - // Perhaps security exception. - } - - try { - loop.selector.close(); - } catch (Throwable e) { - if (logger.isWarnEnabled()) { - logger.warn("Failed to close a temporary selector.", e); - } - } - } - } - - return constraintLevel; - } - } - - private static final class SelectorLoop implements Runnable { - final Selector selector; - volatile boolean done; - volatile boolean selecting; // Just an approximation - - SelectorLoop() throws IOException { - selector = Selector.open(); - } - - @Override - public void run() { - while (!done) { - synchronized (this) { - // Guard - } - try { - selecting = true; - try { - selector.select(1000); - } finally { - selecting = false; - } - - Set keys = selector.selectedKeys(); - for (SelectionKey k: keys) { - k.interestOps(0); - } - keys.clear(); - } catch (IOException e) { - if (logger.isWarnEnabled()) { - logger.warn("Failed to wait for a temporary selector.", e); - } - } - } - } - } - - public static void main(String[] args) throws Exception { - for (Entry e: System.getProperties().entrySet()) { - System.out.println(e.getKey() + ": " + e.getValue()); - } - System.out.println(); - System.out.println("Hard-coded Constraint Level: " + CONSTRAINT_LEVEL); - System.out.println( - "Auto-detected Constraint Level: " + - new ConstraintLevelAutodetector().autodetect()); - } - - private SctpProviderMetadata() { - // Unused - } -} diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpReceiveBufferPool.java b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpReceiveBufferPool.java deleted file mode 100644 index b664a794bc..0000000000 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpReceiveBufferPool.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.sctp; - -import java.lang.ref.SoftReference; -import java.nio.ByteBuffer; - -final class SctpReceiveBufferPool { - - private static final int POOL_SIZE = 8; - - @SuppressWarnings("unchecked") - private final SoftReference[] pool = new SoftReference[POOL_SIZE]; - - SctpReceiveBufferPool() { - super(); - } - - ByteBuffer acquire(int size) { - final SoftReference[] pool = this.pool; - for (int i = 0; i < POOL_SIZE; i ++) { - SoftReference ref = pool[i]; - if (ref == null) { - continue; - } - - ByteBuffer buf = ref.get(); - if (buf == null) { - pool[i] = null; - continue; - } - - if (buf.capacity() < size) { - continue; - } - - pool[i] = null; - - buf.clear(); - return buf; - } - - ByteBuffer buf = ByteBuffer.allocateDirect(normalizeCapacity(size)); - buf.clear(); - return buf; - } - - void release(ByteBuffer buffer) { - final SoftReference[] pool = this.pool; - for (int i = 0; i < POOL_SIZE; i ++) { - SoftReference ref = pool[i]; - if (ref == null || ref.get() == null) { - pool[i] = new SoftReference(buffer); - return; - } - } - - // pool is full - replace one - final int capacity = buffer.capacity(); - for (int i = 0; i < POOL_SIZE; i ++) { - SoftReference ref = pool[i]; - ByteBuffer pooled = ref.get(); - if (pooled == null) { - pool[i] = null; - continue; - } - - if (pooled.capacity() < capacity) { - pool[i] = new SoftReference(buffer); - return; - } - } - } - - private static int normalizeCapacity(int capacity) { - // Normalize to multiple of 1024 - int q = capacity >>> 10; - int r = capacity & 1023; - if (r != 0) { - q ++; - } - return q << 10; - } -} diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpSendBufferPool.java b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpSendBufferPool.java index bbb3f07b35..b3bbb7b95e 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpSendBufferPool.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpSendBufferPool.java @@ -18,27 +18,21 @@ package io.netty.channel.sctp; import com.sun.nio.sctp.MessageInfo; import com.sun.nio.sctp.SctpChannel; import io.netty.buffer.ChannelBuffer; +import io.netty.channel.socket.nio.SendBufferPool; import java.io.IOException; -import java.lang.ref.SoftReference; +import java.net.SocketAddress; import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; +import java.nio.channels.WritableByteChannel; -final class SctpSendBufferPool { +final class SctpSendBufferPool extends SendBufferPool { - private static final SendBuffer EMPTY_BUFFER = new EmptySendBuffer(); + private static final SctpSendBuffer EMPTY_BUFFER = new EmptySendBuffer(); - private static final int DEFAULT_PREALLOCATION_SIZE = 65536; - private static final int ALIGN_SHIFT = 4; - private static final int ALIGN_MASK = 15; - PreallocationRef poolHead; - Preallocation current = new Preallocation(DEFAULT_PREALLOCATION_SIZE); - - SctpSendBufferPool() { - super(); - } - - SendBuffer acquire(Object message) { + @Override + public SctpSendBuffer acquire(Object message) { if (message instanceof SctpFrame) { return acquire((SctpFrame) message); } else { @@ -47,7 +41,7 @@ final class SctpSendBufferPool { } } - private SendBuffer acquire(SctpFrame message) { + private SctpSendBuffer acquire(SctpFrame message) { final ChannelBuffer src = message.getPayloadBuffer(); final int streamNo = message.getStreamIdentifier(); final int protocolId = message.getProtocolIdentifier(); @@ -58,16 +52,16 @@ final class SctpSendBufferPool { } if (src.isDirect()) { - return new UnpooledSendBuffer(streamNo, protocolId, src.toByteBuffer()); + return new SctpUnpooledSendBuffer(streamNo, protocolId, src.toByteBuffer()); } if (src.readableBytes() > DEFAULT_PREALLOCATION_SIZE) { - return new UnpooledSendBuffer(streamNo, protocolId, src.toByteBuffer()); + return new SctpUnpooledSendBuffer(streamNo, protocolId, src.toByteBuffer()); } Preallocation current = this.current; ByteBuffer buffer = current.buffer; int remaining = buffer.remaining(); - PooledSendBuffer dst; + SctpPooledSendBuffer dst; if (size < remaining) { int nextPos = buffer.position() + size; @@ -75,7 +69,7 @@ final class SctpSendBufferPool { buffer.position(align(nextPos)); slice.limit(nextPos); current.refCnt++; - dst = new PooledSendBuffer(streamNo, protocolId, current, slice); + dst = new SctpPooledSendBuffer(streamNo, protocolId, current, slice); } else if (size > remaining) { this.current = current = getPreallocation(); buffer = current.buffer; @@ -83,11 +77,11 @@ final class SctpSendBufferPool { buffer.position(align(size)); slice.limit(size); current.refCnt++; - dst = new PooledSendBuffer(streamNo, protocolId, current, slice); + dst = new SctpPooledSendBuffer(streamNo, protocolId, current, slice); } else { // size == remaining current.refCnt++; this.current = getPreallocation0(); - dst = new PooledSendBuffer(streamNo, protocolId, current, current.buffer); + dst = new SctpPooledSendBuffer(streamNo, protocolId, current, current.buffer); } ByteBuffer dstbuf = dst.buffer; @@ -97,101 +91,22 @@ final class SctpSendBufferPool { return dst; } - private Preallocation getPreallocation() { - Preallocation current = this.current; - if (current.refCnt == 0) { - current.buffer.clear(); - return current; - } - return getPreallocation0(); - } - - private Preallocation getPreallocation0() { - PreallocationRef ref = poolHead; - if (ref != null) { - do { - Preallocation p = ref.get(); - ref = ref.next; - - if (p != null) { - poolHead = ref; - return p; - } - } while (ref != null); - - poolHead = ref; - } - - return new Preallocation(DEFAULT_PREALLOCATION_SIZE); - } - - private static int align(int pos) { - int q = pos >>> ALIGN_SHIFT; - int r = pos & ALIGN_MASK; - if (r != 0) { - q++; - } - return q << ALIGN_SHIFT; - } - - private final class Preallocation { - final ByteBuffer buffer; - int refCnt; - - Preallocation(int capacity) { - buffer = ByteBuffer.allocateDirect(capacity); - } - } - - private final class PreallocationRef extends SoftReference { - final PreallocationRef next; - - PreallocationRef(Preallocation prealloation, PreallocationRef next) { - super(prealloation); - this.next = next; - } - } - - interface SendBuffer { - boolean finished(); - - long writtenBytes(); - - long totalBytes(); + interface SctpSendBuffer extends SendBuffer { long transferTo(SctpChannel ch) throws IOException; - - void release(); + } - class UnpooledSendBuffer implements SendBuffer { + class SctpUnpooledSendBuffer extends UnpooledSendBuffer implements SctpSendBuffer { - final ByteBuffer buffer; - final int initialPos; final int streamNo; final int protocolId; - UnpooledSendBuffer(int streamNo, int protocolId, ByteBuffer buffer) { + SctpUnpooledSendBuffer(int streamNo, int protocolId, ByteBuffer buffer) { + super(buffer); this.streamNo = streamNo; this.protocolId = protocolId; - this.buffer = buffer; - initialPos = buffer.position(); - } - - @Override - public final boolean finished() { - return !buffer.hasRemaining(); - } - - @Override - public final long writtenBytes() { - return buffer.position() - initialPos; - } - - @Override - public final long totalBytes() { - return buffer.limit() - initialPos; } @Override @@ -201,42 +116,17 @@ final class SctpSendBufferPool { messageInfo.streamNumber(streamNo); return ch.send(buffer, messageInfo); } - - @Override - public void release() { - // Unpooled. - } } - final class PooledSendBuffer implements SendBuffer { + final class SctpPooledSendBuffer extends PooledSendBuffer implements SctpSendBuffer { - private final Preallocation parent; - final ByteBuffer buffer; - final int initialPos; final int streamNo; final int protocolId; - PooledSendBuffer(int streamNo, int protocolId, Preallocation parent, ByteBuffer buffer) { + SctpPooledSendBuffer(int streamNo, int protocolId, Preallocation parent, ByteBuffer buffer) { + super(parent, buffer); this.streamNo = streamNo; this.protocolId = protocolId; - this.parent = parent; - this.buffer = buffer; - initialPos = buffer.position(); - } - - @Override - public boolean finished() { - return !buffer.hasRemaining(); - } - - @Override - public long writtenBytes() { - return buffer.position() - initialPos; - } - - @Override - public long totalBytes() { - return buffer.limit() - initialPos; } @Override @@ -246,20 +136,9 @@ final class SctpSendBufferPool { messageInfo.streamNumber(streamNo); return ch.send(buffer, messageInfo); } - - @Override - public void release() { - final Preallocation parent = this.parent; - if (--parent.refCnt == 0) { - parent.buffer.clear(); - if (parent != current) { - poolHead = new PreallocationRef(parent, poolHead); - } - } - } } - static final class EmptySendBuffer implements SendBuffer { + static final class EmptySendBuffer implements SctpSendBuffer { EmptySendBuffer() { super(); @@ -285,6 +164,15 @@ final class SctpSendBufferPool { return 0; } + @Override + public long transferTo(WritableByteChannel ch) throws IOException { + return 0; + } + + @Override + public long transferTo(DatagramChannel ch, SocketAddress raddr) throws IOException { + return 0; + } @Override public void release() { // Unpooled. diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpServerChannelImpl.java b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpServerChannelImpl.java index 142b1ae83f..da3671c3f5 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpServerChannelImpl.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpServerChannelImpl.java @@ -35,13 +35,14 @@ import io.netty.channel.ChannelFactory; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelSink; +import io.netty.channel.socket.nio.NioChannel; import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; /** */ class SctpServerChannelImpl extends AbstractServerChannel - implements SctpServerChannel { + implements SctpServerChannel, NioChannel { private static final InternalLogger logger = InternalLoggerFactory.getInstance(SctpServerChannelImpl.class); @@ -53,13 +54,15 @@ class SctpServerChannelImpl extends AbstractServerChannel private volatile boolean bound; + private SctpWorker worker; + SctpServerChannelImpl( ChannelFactory factory, ChannelPipeline pipeline, - ChannelSink sink) { + ChannelSink sink, SctpWorker worker) { super(factory, pipeline, sink); - + this.worker = worker; try { serverChannel = com.sun.nio.sctp.SctpServerChannel.open(); } catch (IOException e) { @@ -153,4 +156,9 @@ class SctpServerChannelImpl extends AbstractServerChannel protected boolean setClosed() { return super.setClosed(); } + + @Override + public SctpWorker getWorker() { + return worker; + } } diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpServerPipelineSink.java b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpServerPipelineSink.java index c4001af990..b1e3b74072 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpServerPipelineSink.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpServerPipelineSink.java @@ -15,22 +15,8 @@ */ package io.netty.channel.sctp; -import static io.netty.channel.Channels.*; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.SocketAddress; -import java.net.SocketTimeoutException; -import java.nio.channels.CancelledKeyException; -import java.nio.channels.ClosedChannelException; -import java.nio.channels.ClosedSelectorException; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicInteger; - -import com.sun.nio.sctp.SctpChannel; - +import static io.netty.channel.Channels.fireChannelBound; +import static io.netty.channel.Channels.fireExceptionCaught; import io.netty.channel.Channel; import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelFuture; @@ -38,25 +24,20 @@ import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelState; import io.netty.channel.ChannelStateEvent; import io.netty.channel.MessageEvent; -import io.netty.logging.InternalLogger; -import io.netty.logging.InternalLoggerFactory; -import io.netty.util.internal.DeadLockProofWorker; +import io.netty.channel.socket.nio.AbstractNioChannelSink; +import io.netty.channel.socket.nio.WorkerPool; + +import java.net.InetAddress; +import java.net.SocketAddress; /** */ -class SctpServerPipelineSink extends AbstractSctpChannelSink { +class SctpServerPipelineSink extends AbstractNioChannelSink { - static final InternalLogger logger = - InternalLoggerFactory.getInstance(SctpServerPipelineSink.class); + private final WorkerPool workerPool; - private final SctpWorker[] workers; - private final AtomicInteger workerIndex = new AtomicInteger(); - - SctpServerPipelineSink(Executor workerExecutor, int workerCount) { - workers = new SctpWorker[workerCount]; - for (int i = 0; i < workers.length; i ++) { - workers[i] = new SctpWorker(workerExecutor); - } + SctpServerPipelineSink(WorkerPool workerPool) { + this.workerPool = workerPool; } @Override @@ -85,14 +66,14 @@ class SctpServerPipelineSink extends AbstractSctpChannelSink { switch (state) { case OPEN: if (Boolean.FALSE.equals(value)) { - close(channel, future); + channel.getWorker().close(channel, future); } break; case BOUND: if (value != null) { bind(channel, future, (SocketAddress) value); } else { - close(channel, future); + channel.getWorker().close(channel, future); } case INTEREST_OPS: if (event instanceof SctpBindAddressEvent) { @@ -119,51 +100,47 @@ class SctpServerPipelineSink extends AbstractSctpChannelSink { switch (state) { case OPEN: if (Boolean.FALSE.equals(value)) { - channel.worker.close(channel, future); + channel.getWorker().close(channel, future); } break; case BOUND: case CONNECTED: if (value == null) { - channel.worker.close(channel, future); + channel.getWorker().close(channel, future); } break; case INTEREST_OPS: - channel.worker.setInterestOps(channel, future, (Integer) value); + channel.getWorker().setInterestOps(channel, future, (Integer) value); break; } } else if (e instanceof MessageEvent) { MessageEvent event = (MessageEvent) e; SctpChannelImpl channel = (SctpChannelImpl) event.getChannel(); - boolean offered = channel.writeBuffer.offer(event); + boolean offered = channel.getWriteBufferQueue().offer(event); assert offered; - channel.worker.writeFromUserCode(channel); + channel.getWorker().writeFromUserCode(channel); } } private void bind( SctpServerChannelImpl channel, ChannelFuture future, SocketAddress localAddress) { - boolean bound = false; - boolean bossStarted = false; try { channel.serverChannel.bind(localAddress, channel.getConfig().getBacklog()); bound = true; - channel.setBound(); + future.setSuccess(); fireChannelBound(channel, channel.getLocalAddress()); - Executor bossExecutor = - ((SctpServerSocketChannelFactory) channel.getFactory()).bossExecutor; - DeadLockProofWorker.start(bossExecutor, new Boss(channel)); - bossStarted = true; + workerPool.nextWorker().registerWithWorker(channel, future); + } catch (Throwable t) { future.setFailure(t); fireExceptionCaught(channel, t); } finally { - if (!bossStarted && bound) { - close(channel, future); + if (!bound) { + channel.getWorker().close(channel, future); } } } @@ -192,146 +169,5 @@ class SctpServerPipelineSink extends AbstractSctpChannelSink { } } - private void close(SctpServerChannelImpl channel, ChannelFuture future) { - boolean bound = channel.isBound(); - try { - if (channel.serverChannel.isOpen()) { - channel.serverChannel.close(); - Selector selector = channel.selector; - if (selector != null) { - selector.wakeup(); - } - } - - // Make sure the boss thread is not running so that that the future - // is notified after a new connection cannot be accepted anymore. - // See NETTY-256 for more information. - channel.shutdownLock.lock(); - try { - if (channel.setClosed()) { - future.setSuccess(); - if (bound) { - fireChannelUnbound(channel); - } - fireChannelClosed(channel); - } else { - future.setSuccess(); - } - } finally { - channel.shutdownLock.unlock(); - } - } catch (Throwable t) { - future.setFailure(t); - fireExceptionCaught(channel, t); - } - } - - SctpWorker nextWorker() { - return workers[Math.abs( - workerIndex.getAndIncrement() % workers.length)]; - } - - private final class Boss implements Runnable { - private final Selector selector; - private final SctpServerChannelImpl channel; - - Boss(SctpServerChannelImpl channel) throws IOException { - this.channel = channel; - - selector = Selector.open(); - - boolean registered = false; - try { - channel.serverChannel.register(selector, SelectionKey.OP_ACCEPT); - registered = true; - } finally { - if (!registered) { - closeSelector(); - } - } - - channel.selector = selector; - } - - @Override - public void run() { - final Thread currentThread = Thread.currentThread(); - - channel.shutdownLock.lock(); - try { - for (;;) { - try { - if (selector.select(500) > 0) { - selector.selectedKeys().clear(); - } - - SctpChannel acceptedSocket = channel.serverChannel.accept(); - if (acceptedSocket != null) { - registerAcceptedChannel(acceptedSocket, currentThread); - } - } catch (SocketTimeoutException e) { - // Thrown every second to get ClosedChannelException - // raised. - } catch (CancelledKeyException e) { - // Raised by accept() when the server socket was closed. - } catch (ClosedSelectorException e) { - // Raised by accept() when the server socket was closed. - } catch (ClosedChannelException e) { - // Closed as requested. - break; - } catch (Throwable e) { - if (logger.isWarnEnabled()) { - logger.warn( - "Failed to accept a connection.", e); - } - try { - Thread.sleep(1000); - } catch (InterruptedException e1) { - // Ignore - } - } - } - } finally { - channel.shutdownLock.unlock(); - closeSelector(); - } - } - - private void registerAcceptedChannel(SctpChannel acceptedSocket, Thread currentThread) { - try { - ChannelPipeline pipeline = - channel.getConfig().getPipelineFactory().getPipeline(); - SctpWorker worker = nextWorker(); - worker.register(new SctpAcceptedChannel( - channel.getFactory(), pipeline, channel, - SctpServerPipelineSink.this, acceptedSocket, - worker, currentThread), null); - } catch (Exception e) { - if (logger.isWarnEnabled()) { - logger.warn( - "Failed to initialize an accepted socket.", e); - } - try { - acceptedSocket.close(); - } catch (IOException e2) { - if (logger.isWarnEnabled()) { - logger.warn( - "Failed to close a partially accepted socket.", - e2); - } - } - } - } - - private void closeSelector() { - channel.selector = null; - try { - selector.close(); - } catch (Exception e) { - if (logger.isWarnEnabled()) { - logger.warn("Failed to close a selector.", e); - } - } - } - } + } diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpServerSocketChannelFactory.java b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpServerSocketChannelFactory.java index 7ac54e1fdc..c7a632db1c 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpServerSocketChannelFactory.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpServerSocketChannelFactory.java @@ -18,7 +18,9 @@ package io.netty.channel.sctp; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelSink; import io.netty.channel.ServerChannelFactory; -import io.netty.util.internal.ExecutorUtil; +import io.netty.channel.socket.nio.SelectorUtil; +import io.netty.channel.socket.nio.WorkerPool; +import io.netty.util.ExternalResourceReleasable; import java.util.concurrent.Executor; @@ -77,9 +79,8 @@ import java.util.concurrent.Executor; */ public class SctpServerSocketChannelFactory implements ServerChannelFactory { - final Executor bossExecutor; - private final Executor workerExecutor; private final ChannelSink sink; + private final WorkerPool workerPool; /** * Creates a new instance. Calling this constructor is same with calling @@ -87,52 +88,45 @@ public class SctpServerSocketChannelFactory implements ServerChannelFactory { * the number of available processors in the machine. The number of * available processors is obtained by {@link Runtime#availableProcessors()}. * - * @param bossExecutor - * the {@link java.util.concurrent.Executor} which will execute the boss threads * @param workerExecutor * the {@link java.util.concurrent.Executor} which will execute the I/O worker threads */ - public SctpServerSocketChannelFactory( - Executor bossExecutor, Executor workerExecutor) { - this(bossExecutor, workerExecutor, SelectorUtil.DEFAULT_IO_THREADS); + public SctpServerSocketChannelFactory(Executor workerExecutor) { + this(workerExecutor, SelectorUtil.DEFAULT_IO_THREADS); } /** * Creates a new instance. * - * @param bossExecutor - * the {@link java.util.concurrent.Executor} which will execute the boss threads * @param workerExecutor * the {@link java.util.concurrent.Executor} which will execute the I/O worker threads * @param workerCount * the maximum number of I/O worker threads */ - public SctpServerSocketChannelFactory( - Executor bossExecutor, Executor workerExecutor, + public SctpServerSocketChannelFactory(Executor workerExecutor, int workerCount) { - if (bossExecutor == null) { - throw new NullPointerException("bossExecutor"); - } - if (workerExecutor == null) { - throw new NullPointerException("workerExecutor"); - } - if (workerCount <= 0) { - throw new IllegalArgumentException( - "workerCount (" + workerCount + ") " + - "must be a positive integer."); - } - this.bossExecutor = bossExecutor; - this.workerExecutor = workerExecutor; - sink = new SctpServerPipelineSink(workerExecutor, workerCount); + this(new SctpWorkerPool(workerExecutor, workerCount, true)); } + public SctpServerSocketChannelFactory(WorkerPool workerPool) { + if (workerPool == null) { + throw new NullPointerException("workerPool"); + } + this.workerPool = workerPool; + sink = new SctpServerPipelineSink(workerPool); + } + @Override public SctpServerChannel newChannel(ChannelPipeline pipeline) { - return new SctpServerChannelImpl(this, pipeline, sink); + return new SctpServerChannelImpl(this, pipeline, sink, workerPool.nextWorker()); } + @Override public void releaseExternalResources() { - ExecutorUtil.terminate(bossExecutor, workerExecutor); + if (workerPool instanceof ExternalResourceReleasable) { + ((ExternalResourceReleasable) workerPool).releaseExternalResources(); + } + } } diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpWorker.java b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpWorker.java index 534bab2e8b..83fb429529 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpWorker.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpWorker.java @@ -15,321 +15,149 @@ */ package io.netty.channel.sctp; -import static io.netty.channel.Channels.*; +import static io.netty.channel.Channels.fireChannelBound; +import static io.netty.channel.Channels.fireChannelClosed; +import static io.netty.channel.Channels.fireChannelConnected; +import static io.netty.channel.Channels.fireChannelUnbound; +import static io.netty.channel.Channels.fireExceptionCaught; +import static io.netty.channel.Channels.fireMessageReceived; +import static io.netty.channel.Channels.fireWriteComplete; +import static io.netty.channel.Channels.succeededFuture; import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBufferFactory; import io.netty.channel.Channel; import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelPipeline; import io.netty.channel.MessageEvent; import io.netty.channel.ReceiveBufferSizePredictor; -import io.netty.channel.sctp.SctpSendBufferPool.SendBuffer; -import io.netty.channel.socket.Worker; -import io.netty.logging.InternalLogger; -import io.netty.logging.InternalLoggerFactory; -import io.netty.util.internal.DeadLockProofWorker; -import io.netty.util.internal.QueueFactory; +import io.netty.channel.sctp.SctpSendBufferPool.SctpSendBuffer; +import io.netty.channel.socket.nio.AbstractNioChannel; +import io.netty.channel.socket.nio.NioWorker; import java.io.IOException; +import java.net.ConnectException; import java.net.SocketAddress; +import java.net.SocketTimeoutException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousCloseException; import java.nio.channels.CancelledKeyException; import java.nio.channels.ClosedChannelException; -import java.nio.channels.NotYetConnectedException; +import java.nio.channels.ClosedSelectorException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; -import java.util.Iterator; import java.util.Queue; import java.util.Set; import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; import com.sun.nio.sctp.MessageInfo; +import com.sun.nio.sctp.SctpChannel; /** */ -class SctpWorker implements Worker { +public class SctpWorker extends NioWorker { - private static final InternalLogger logger = - InternalLoggerFactory.getInstance(SctpWorker.class); - - private static final int CONSTRAINT_LEVEL = SctpProviderMetadata.CONSTRAINT_LEVEL; - - static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization. - - private final Executor executor; - private boolean started; - volatile Thread thread; - volatile Selector selector; - private final AtomicBoolean wakenUp = new AtomicBoolean(); - private final ReadWriteLock selectorGuard = new ReentrantReadWriteLock(); - private final Object startStopLock = new Object(); - private final Queue registerTaskQueue = QueueFactory.createQueue(Runnable.class); - private final Queue writeTaskQueue = QueueFactory.createQueue(Runnable.class); - private final Queue eventQueue = QueueFactory.createQueue(Runnable.class); - - private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation - - private final SctpReceiveBufferPool recvBufferPool = new SctpReceiveBufferPool(); private final SctpSendBufferPool sendBufferPool = new SctpSendBufferPool(); - SctpWorker(Executor executor) { - this.executor = executor; + public SctpWorker(Executor executor) { + super(executor); } - - void register(SctpChannelImpl channel, ChannelFuture future) { - - boolean server = !(channel instanceof SctpClientChannel); - Runnable registerTask = new RegisterTask(channel, future, server); - Selector selector; - - synchronized (startStopLock) { - if (!started) { - // Open a selector if this worker didn't start yet. - try { - this.selector = selector = Selector.open(); - } catch (Throwable t) { - throw new ChannelException( - "Failed to create a selector.", t); - } - - // Start the worker thread with the new Selector. - boolean success = false; - try { - DeadLockProofWorker.start(executor, this); - success = true; - } finally { - if (!success) { - // Release the Selector if the execution fails. - try { - selector.close(); - } catch (Throwable t) { - if (logger.isWarnEnabled()) { - logger.warn("Failed to close a selector.", t); - } - } - this.selector = selector = null; - // The method will return to the caller at this point. - } - } - } else { - // Use the existing selector if this worker has been started. - selector = this.selector; - } - - assert selector != null && selector.isOpen(); - - started = true; - boolean offered = registerTaskQueue.offer(registerTask); - assert offered; - } - - if (wakenUp.compareAndSet(false, true)) { - selector.wakeup(); - } + + public SctpWorker(Executor executor, boolean allowShutdownOnIdle) { + super(executor, allowShutdownOnIdle); } @Override - public void run() { - thread = Thread.currentThread(); - - boolean shutdown = false; - Selector selector = this.selector; - for (; ;) { - wakenUp.set(false); - - if (CONSTRAINT_LEVEL != 0) { - selectorGuard.writeLock().lock(); - // This empty synchronization block prevents the selector - // from acquiring its lock. - selectorGuard.writeLock().unlock(); - } - - try { - SelectorUtil.select(selector); - - // 'wakenUp.compareAndSet(false, true)' is always evaluated - // before calling 'selector.wakeup()' to reduce the wake-up - // overhead. (Selector.wakeup() is an expensive operation.) - // - // However, there is a race condition in this approach. - // The race condition is triggered when 'wakenUp' is set to - // true too early. - // - // 'wakenUp' is set to true too early if: - // 1) Selector is waken up between 'wakenUp.set(false)' and - // 'selector.select(...)'. (BAD) - // 2) Selector is waken up between 'selector.select(...)' and - // 'if (wakenUp.get()) { ... }'. (OK) - // - // In the first case, 'wakenUp' is set to true and the - // following 'selector.select(...)' will wake up immediately. - // Until 'wakenUp' is set to false again in the next round, - // 'wakenUp.compareAndSet(false, true)' will fail, and therefore - // any attempt to wake up the Selector will fail, too, causing - // the following 'selector.select(...)' call to block - // unnecessarily. - // - // To fix this problem, we wake up the selector again if wakenUp - // is true immediately after selector.select(...). - // It is inefficient in that it wakes up the selector for both - // the first case (BAD - wake-up required) and the second case - // (OK - no wake-up required). - - if (wakenUp.get()) { + public void registerWithWorker(final Channel channel, final ChannelFuture future) { + try { + if (channel instanceof SctpServerChannelImpl) { + final SctpServerChannelImpl ch = (SctpServerChannelImpl) channel; + registerTaskQueue.add(new Runnable() { + + @Override + public void run() { + try { + ch.serverChannel.register(selector, SelectionKey.OP_ACCEPT, ch); + } catch (Throwable t) { + future.setFailure(t); + fireExceptionCaught(channel, t); + } + } + }); + if (wakenUp.compareAndSet(false, true)) { selector.wakeup(); } - - cancelledKeys = 0; - processRegisterTaskQueue(); - processEventQueue(); - processWriteTaskQueue(); - processSelectedKeys(selector.selectedKeys()); - - // Exit the loop when there's nothing to handle. - // The shutdown flag is used to delay the shutdown of this - // loop to avoid excessive Selector creation when - // connections are registered in a one-by-one manner instead of - // concurrent manner. - if (selector.keys().isEmpty()) { - if (shutdown || - executor instanceof ExecutorService && ((ExecutorService) executor).isShutdown()) { - - synchronized (startStopLock) { - if (registerTaskQueue.isEmpty() && selector.keys().isEmpty()) { - started = false; - try { - selector.close(); - } catch (IOException e) { - if (logger.isWarnEnabled()) { - logger.warn( - "Failed to close a selector.", e); - } - } finally { - this.selector = null; - } - break; - } else { - shutdown = false; + } else if (channel instanceof SctpClientChannel) { + final SctpClientChannel clientChannel = (SctpClientChannel) channel; + + registerTaskQueue.add(new Runnable() { + + @Override + public void run() { + try { + try { + clientChannel.getJdkChannel().register(selector, clientChannel.getRawInterestOps() | SelectionKey.OP_CONNECT, clientChannel); + } catch (ClosedChannelException e) { + clientChannel.getWorker().close(clientChannel, succeededFuture(channel)); } + int connectTimeout = channel.getConfig().getConnectTimeoutMillis(); + if (connectTimeout > 0) { + clientChannel.connectDeadlineNanos = System.nanoTime() + connectTimeout * 1000000L; + } + } catch (Throwable t) { + future.setFailure(t); + fireExceptionCaught(channel, t); } - } else { - // Give one more second. - shutdown = true; } - } else { - shutdown = false; - } - } catch (Throwable t) { - if (logger.isWarnEnabled()) { - logger.warn( - "Unexpected exception in the selector loop.", t); - } - // Prevent possible consecutive immediate failures that lead to - // excessive CPU consumption. - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - // Ignore. + }); + if (wakenUp.compareAndSet(false, true)) { + selector.wakeup(); } + } else { + super.registerWithWorker(channel, future); } + + } catch (Throwable t) { + future.setFailure(t); + fireExceptionCaught(channel, t); } } + + @Override + protected boolean accept(SelectionKey key) { + SctpServerChannelImpl channel = (SctpServerChannelImpl) key.attachment(); + try { + SctpChannel acceptedSocket = channel.serverChannel.accept(); + if (acceptedSocket != null) { + + ChannelPipeline pipeline = + channel.getConfig().getPipelineFactory().getPipeline(); + registerTask(new SctpAcceptedChannel(channel.getFactory(), pipeline, channel, + channel.getPipeline().getSink(), acceptedSocket, this), null); + return true; + } + return false; + } catch (SocketTimeoutException e) { + // Thrown every second to get ClosedChannelException + // raised. + } catch (CancelledKeyException e) { + // Raised by accept() when the server socket was closed. + } catch (ClosedSelectorException e) { + // Raised by accept() when the server socket was closed. + } catch (ClosedChannelException e) { + // Closed as requested. + } catch (Throwable e) { + if (logger.isWarnEnabled()) { + logger.warn( + "Failed to accept a connection.", e); + } + } + return true; + } + @Override - public void executeInIoThread(Runnable task) { - if (Thread.currentThread() == thread) { - task.run(); - } else { - boolean added = eventQueue.offer(task); - - if (added) { - // wake up the selector to speed things - selector.wakeup(); - } - - } - - } - - static boolean isIoThread(SctpChannelImpl channel) { - return Thread.currentThread() == channel.worker.thread; - } - - private void processRegisterTaskQueue() throws IOException { - for (; ;) { - final Runnable task = registerTaskQueue.poll(); - if (task == null) { - break; - } - - task.run(); - cleanUpCancelledKeys(); - } - } - - private void processWriteTaskQueue() throws IOException { - for (; ;) { - final Runnable task = writeTaskQueue.poll(); - if (task == null) { - break; - } - - task.run(); - cleanUpCancelledKeys(); - } - } - - private void processEventQueue() throws IOException { - for (;;) { - final Runnable task = eventQueue.poll(); - if (task == null) { - break; - } - task.run(); - cleanUpCancelledKeys(); - } - } - - private void processSelectedKeys(final Set selectedKeys) throws IOException { - for (Iterator i = selectedKeys.iterator(); i.hasNext();) { - SelectionKey k = i.next(); - i.remove(); - try { - int readyOps = k.readyOps(); - if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) { - if (!read(k)) { - // Connection already closed - no need to handle write. - continue; - } - } - if ((readyOps & SelectionKey.OP_WRITE) != 0) { - writeFromSelectorLoop(k); - } - } catch (CancelledKeyException e) { - close(k); - } - - if (cleanUpCancelledKeys()) { - break; // break the loop to avoid ConcurrentModificationException - } - } - } - - private boolean cleanUpCancelledKeys() throws IOException { - if (cancelledKeys >= CLEANUP_INTERVAL) { - cancelledKeys = 0; - selector.selectNow(); - return true; - } - return false; - } - - private boolean read(SelectionKey k) { + protected boolean read(SelectionKey k) { final SctpChannelImpl channel = (SctpChannelImpl) k.attachment(); final ReceiveBufferSizePredictor predictor = @@ -341,7 +169,7 @@ class SctpWorker implements Worker { ByteBuffer bb = recvBufferPool.acquire(predictedRecvBufSize); try { - messageInfo = channel.channel.receive(bb, null, channel.notificationHandler); + messageInfo = channel.getJdkChannel().getChannel().receive(bb, null, channel.notificationHandler); messageReceived = messageInfo != null; } catch (ClosedChannelException e) { // Can happen, and does not need a user attention. @@ -372,7 +200,7 @@ class SctpWorker implements Worker { recvBufferPool.release(bb); } - if (channel.channel.isBlocking() && !messageReceived) { + if (channel.getJdkChannel().getChannel().isBlocking() && !messageReceived) { k.cancel(); // Some JDK implementations run into an infinite loop without this. close(channel, succeededFuture(channel)); return false; @@ -381,82 +209,25 @@ class SctpWorker implements Worker { return true; } - private void close(SelectionKey k) { - SctpChannelImpl ch = (SctpChannelImpl) k.attachment(); - close(ch, succeededFuture(ch)); - } - - void writeFromUserCode(final SctpChannelImpl channel) { - if (!channel.isConnected()) { - cleanUpWriteBuffer(channel); - return; - } - - if (scheduleWriteIfNecessary(channel)) { - return; - } - - // From here, we are sure Thread.currentThread() == workerThread. - - if (channel.writeSuspended) { - return; - } - - if (channel.inWriteNowLoop) { - return; - } - - write0(channel); - } - - void writeFromTaskLoop(final SctpChannelImpl ch) { - if (!ch.writeSuspended) { - write0(ch); - } - } - - void writeFromSelectorLoop(final SelectionKey k) { - SctpChannelImpl ch = (SctpChannelImpl) k.attachment(); - ch.writeSuspended = false; - write0(ch); - } - - private boolean scheduleWriteIfNecessary(final SctpChannelImpl channel) { - final Thread currentThread = Thread.currentThread(); - final Thread workerThread = thread; - if (currentThread != workerThread) { - if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) { - boolean offered = writeTaskQueue.offer(channel.writeTask); - assert offered; + @Override + protected void connect(SelectionKey k) { + final SctpClientChannel ch = (SctpClientChannel) k.attachment(); + try { + // TODO: Remove cast + if (ch.getJdkChannel().finishConnect()) { + registerTask(ch, ch.connectFuture); } - - if (!(channel instanceof SctpAcceptedChannel) || - ((SctpAcceptedChannel) channel).bossThread != currentThread) { - final Selector workerSelector = selector; - if (workerSelector != null) { - if (wakenUp.compareAndSet(false, true)) { - workerSelector.wakeup(); - } - } - } else { - // A write request can be made from an acceptor thread (boss) - // when a user attempted to write something in: - // - // * channelOpen() - // * channelBound() - // * channelConnected(). - // - // In this case, there's no need to wake up the selector because - // the channel is not even registered yet at this moment. - } - - return true; + } catch (Throwable t) { + ch.connectFuture.setFailure(t); + fireExceptionCaught(ch, t); + k.cancel(); // Some JDK implementations run into an infinite loop without this. + ch.getWorker().close(ch, succeededFuture(ch)); } - - return false; } - - private void write0(SctpChannelImpl channel) { + + @Override + protected void write0(AbstractNioChannel ach) { + SctpChannelImpl channel = (SctpChannelImpl) ach; boolean open = true; boolean addOpWrite = false; boolean removeOpWrite = false; @@ -464,24 +235,26 @@ class SctpWorker implements Worker { long writtenBytes = 0; final SctpSendBufferPool sendBufferPool = this.sendBufferPool; - final com.sun.nio.sctp.SctpChannel ch = channel.channel; - final Queue writeBuffer = channel.writeBuffer; + final com.sun.nio.sctp.SctpChannel ch = channel.getJdkChannel().getChannel(); + final Queue writeBuffer = channel.getWriteBufferQueue(); final int writeSpinCount = channel.getConfig().getWriteSpinCount(); - synchronized (channel.writeLock) { - channel.inWriteNowLoop = true; + synchronized (channel.getWriteLock()) { + channel.setInWriteNowLoop(true); for (; ;) { - MessageEvent evt = channel.currentWriteEvent; - SendBuffer buf; + MessageEvent evt = channel.getCurrentWriteEvent(); + SctpSendBuffer buf; if (evt == null) { - if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) { + if ((evt = writeBuffer.poll()) == null) { removeOpWrite = true; - channel.writeSuspended = false; + channel.setWriteSuspended(false); break; } + channel.setCurrentWriteEvent(evt); - channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage()); + buf = sendBufferPool.acquire(evt.getMessage()); + channel.setCurrentWriteBuffer(buf); } else { - buf = channel.currentWriteBuffer; + buf = channel.getCurrentWriteBuffer(); } ChannelFuture future = evt.getFuture(); @@ -501,15 +274,15 @@ class SctpWorker implements Worker { if (buf.finished()) { // Successful write - proceed to the next message. buf.release(); - channel.currentWriteEvent = null; - channel.currentWriteBuffer = null; + channel.setCurrentWriteEvent(null); + channel.setCurrentWriteBuffer(null); evt = null; buf = null; future.setSuccess(); } else { // Not written fully - perhaps the kernel buffer is full. addOpWrite = true; - channel.writeSuspended = true; + channel.setWriteSuspended(true); if (localWrittenBytes > 0) { // Notify progress listeners if necessary. @@ -523,8 +296,8 @@ class SctpWorker implements Worker { // Doesn't need a user attention - ignore. } catch (Throwable t) { buf.release(); - channel.currentWriteEvent = null; - channel.currentWriteBuffer = null; + channel.setCurrentWriteEvent(null); + channel.setCurrentWriteBuffer(null); buf = null; evt = null; future.setFailure(t); @@ -535,7 +308,7 @@ class SctpWorker implements Worker { } } } - channel.inWriteNowLoop = false; + channel.setInWriteNowLoop(false); } if (open) { @@ -549,263 +322,142 @@ class SctpWorker implements Worker { fireWriteComplete(channel, writtenBytes); } - private void setOpWrite(SctpChannelImpl channel) { - Selector selector = this.selector; - SelectionKey key = channel.channel.keyFor(selector); - if (key == null) { - return; - } - if (!key.isValid()) { - close(key); - return; - } - - // interestOps can change at any time and at any thread. - // Acquire a lock to avoid possible race condition. - synchronized (channel.interestOpsLock) { - int interestOps = channel.getRawInterestOps(); - if ((interestOps & SelectionKey.OP_WRITE) == 0) { - interestOps |= SelectionKey.OP_WRITE; - key.interestOps(interestOps); - channel.setRawInterestOpsNow(interestOps); + @Override + protected void registerTask(AbstractNioChannel ch, ChannelFuture future) { + boolean server = !(ch instanceof SctpClientChannel); + SctpChannelImpl channel = (SctpChannelImpl) ch; + + SocketAddress localAddress = channel.getLocalAddress(); + SocketAddress remoteAddress = channel.getRemoteAddress(); + if (localAddress == null || remoteAddress == null) { + if (future != null) { + future.setFailure(new ClosedChannelException()); } - } - } - - private void clearOpWrite(SctpChannelImpl channel) { - Selector selector = this.selector; - SelectionKey key = channel.channel.keyFor(selector); - if (key == null) { - return; - } - if (!key.isValid()) { - close(key); + close(channel, succeededFuture(channel)); return; } - // interestOps can change at any time and at any thread. - // Acquire a lock to avoid possible race condition. - synchronized (channel.interestOpsLock) { - int interestOps = channel.getRawInterestOps(); - if ((interestOps & SelectionKey.OP_WRITE) != 0) { - interestOps &= ~SelectionKey.OP_WRITE; - key.interestOps(interestOps); - channel.setRawInterestOpsNow(interestOps); - } - } - } - - void close(SctpChannelImpl channel, ChannelFuture future) { - boolean connected = channel.isConnected(); - boolean bound = channel.isBound(); try { - channel.channel.close(); - cancelledKeys++; - if (channel.setClosed()) { - future.setSuccess(); - if (connected) { - fireChannelDisconnected(channel); - } - if (bound) { - fireChannelUnbound(channel); - } - - cleanUpWriteBuffer(channel); - fireChannelClosed(channel); - } else { - future.setSuccess(); - } - } catch (Throwable t) { - future.setFailure(t); - fireExceptionCaught(channel, t); - } - } - - private void cleanUpWriteBuffer(SctpChannelImpl channel) { - Exception cause = null; - boolean fireExceptionCaught = false; - - // Clean up the stale messages in the write buffer. - synchronized (channel.writeLock) { - MessageEvent evt = channel.currentWriteEvent; - if (evt != null) { - // Create the exception only once to avoid the excessive overhead - // caused by fillStackTrace. - if (channel.isOpen()) { - cause = new NotYetConnectedException(); - } else { - cause = new ClosedChannelException(); - } - - ChannelFuture future = evt.getFuture(); - channel.currentWriteBuffer.release(); - channel.currentWriteBuffer = null; - channel.currentWriteEvent = null; - evt = null; - future.setFailure(cause); - fireExceptionCaught = true; - } - - Queue writeBuffer = channel.writeBuffer; - if (!writeBuffer.isEmpty()) { - // Create the exception only once to avoid the excessive overhead - // caused by fillStackTrace. - if (cause == null) { - if (channel.isOpen()) { - cause = new NotYetConnectedException(); - } else { - cause = new ClosedChannelException(); - } - } - - for (; ;) { - evt = writeBuffer.poll(); - if (evt == null) { - break; - } - evt.getFuture().setFailure(cause); - fireExceptionCaught = true; - } - } - } - - if (fireExceptionCaught) { - fireExceptionCaught(channel, cause); - } - } - - void setInterestOps( - SctpChannelImpl channel, ChannelFuture future, int interestOps) { - boolean changed = false; - try { - // interestOps can change at any time and at any thread. - // Acquire a lock to avoid possible race condition. - synchronized (channel.interestOpsLock) { - Selector selector = this.selector; - SelectionKey key = channel.channel.keyFor(selector); - - if (key == null || selector == null) { - // Not registered to the worker yet. - // Set the rawInterestOps immediately; RegisterTask will pick it up. - channel.setRawInterestOpsNow(interestOps); - return; - } - - // Override OP_WRITE flag - a user cannot change this flag. - interestOps &= ~Channel.OP_WRITE; - interestOps |= channel.getRawInterestOps() & Channel.OP_WRITE; - - switch (CONSTRAINT_LEVEL) { - case 0: - if (channel.getRawInterestOps() != interestOps) { - key.interestOps(interestOps); - if (Thread.currentThread() != thread && - wakenUp.compareAndSet(false, true)) { - selector.wakeup(); - } - changed = true; - } - break; - case 1: - case 2: - if (channel.getRawInterestOps() != interestOps) { - if (Thread.currentThread() == thread) { - key.interestOps(interestOps); - changed = true; - } else { - selectorGuard.readLock().lock(); - try { - if (wakenUp.compareAndSet(false, true)) { - selector.wakeup(); - } - key.interestOps(interestOps); - changed = true; - } finally { - selectorGuard.readLock().unlock(); - } - } - } - break; - default: - throw new Error(); - } - - if (changed) { - channel.setRawInterestOpsNow(interestOps); - } - } - - future.setSuccess(); - if (changed) { - fireChannelInterestChanged(channel); - } - } catch (CancelledKeyException e) { - // setInterestOps() was called on a closed channel. - ClosedChannelException cce = new ClosedChannelException(); - future.setFailure(cce); - fireExceptionCaught(channel, cce); - } catch (Throwable t) { - future.setFailure(t); - fireExceptionCaught(channel, t); - } - } - - private final class RegisterTask implements Runnable { - private final SctpChannelImpl channel; - private final ChannelFuture future; - private final boolean server; - - RegisterTask( - SctpChannelImpl channel, ChannelFuture future, boolean server) { - - this.channel = channel; - this.future = future; - this.server = server; - } - - @Override - public void run() { - SocketAddress localAddress = channel.getLocalAddress(); - SocketAddress remoteAddress = channel.getRemoteAddress(); - if (localAddress == null || remoteAddress == null) { - if (future != null) { - future.setFailure(new ClosedChannelException()); - } - close(channel, succeededFuture(channel)); - return; - } - - try { - if (server) { - channel.channel.configureBlocking(false); - } - - synchronized (channel.interestOpsLock) { - channel.channel.register( + boolean registered = channel.getJdkChannel().isRegistered(); + if (!registered) { + synchronized (channel.getInterestedOpsLock()) { + channel.getJdkChannel().register( selector, channel.getRawInterestOps(), channel); } - channel.setConnected(); - if (future != null) { - future.setSuccess(); + + } else { + // TODO: Is this needed ? + setInterestOps(channel, future, channel.getRawInterestOps()); + } + if (future != null) { + ((SctpChannelImpl) channel).setConnected(); + future.setSuccess(); + } + + } catch (IOException e) { + if (future != null) { + future.setFailure(e); + } + close(channel, succeededFuture(channel)); + if (!(e instanceof ClosedChannelException)) { + throw new ChannelException( + "Failed to register a socket to the selector.", e); + } + } + + if (!server) { + if (!((SctpClientChannel) channel).boundManually) { + fireChannelBound(channel, localAddress); + } + fireChannelConnected(channel, remoteAddress); + } + } + + @Override + protected void processConnectTimeout(Set keys, long currentTimeNanos) { + ConnectException cause = null; + for (SelectionKey k: keys) { + if (!k.isValid()) { + // Comment the close call again as it gave us major problems with ClosedChannelExceptions. + // + // See: + // * https://github.com/netty/netty/issues/142 + // * https://github.com/netty/netty/issues/138 + // + //close(k); + continue; + } + + // Something is ready so skip it + if (k.readyOps() != 0) { + continue; + } + // check if the channel is in + Object attachment = k.attachment(); + if (attachment instanceof SctpClientChannel) { + SctpClientChannel ch = (SctpClientChannel) attachment; + if (!ch.isConnected() && ch.connectDeadlineNanos > 0 && currentTimeNanos >= ch.connectDeadlineNanos) { + + if (cause == null) { + cause = new ConnectException("connection timed out"); + } + + ch.connectFuture.setFailure(cause); + fireExceptionCaught(ch, cause); + ch.getWorker().close(ch, succeededFuture(ch)); } - } catch (IOException e) { - if (future != null) { - future.setFailure(e); - } - close(channel, succeededFuture(channel)); - if (!(e instanceof ClosedChannelException)) { - throw new ChannelException( - "Failed to register a socket to the selector.", e); + } + + + + } + } + + @Override + protected void close(SelectionKey k) { + Object attachment = k.attachment(); + if (attachment instanceof SctpServerChannelImpl) { + SctpServerChannelImpl ch = (SctpServerChannelImpl) attachment; + close(ch, succeededFuture(ch)); + } else { + super.close(k); + } + } + + void close(SctpServerChannelImpl channel, ChannelFuture future) { + boolean bound = channel.isBound(); + try { + if (channel.serverChannel.isOpen()) { + channel.serverChannel.close(); + Selector selector = channel.selector; + if (selector != null) { + selector.wakeup(); } } - if (!server) { - if (!((SctpClientChannel) channel).boundManually) { - fireChannelBound(channel, localAddress); + // Make sure the boss thread is not running so that that the future + // is notified after a new connection cannot be accepted anymore. + // See NETTY-256 for more information. + channel.shutdownLock.lock(); + try { + if (channel.setClosed()) { + future.setSuccess(); + if (bound) { + fireChannelUnbound(channel); + } + fireChannelClosed(channel); + } else { + future.setSuccess(); } - fireChannelConnected(channel, remoteAddress); + } finally { + channel.shutdownLock.unlock(); } + } catch (Throwable t) { + future.setFailure(t); + fireExceptionCaught(channel, t); } } + + } diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpWorkerPool.java b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpWorkerPool.java new file mode 100644 index 0000000000..899eedf2a4 --- /dev/null +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpWorkerPool.java @@ -0,0 +1,33 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.sctp; + +import java.util.concurrent.Executor; + +import io.netty.channel.socket.nio.AbstractNioWorkerPool; + +public class SctpWorkerPool extends AbstractNioWorkerPool { + + public SctpWorkerPool(Executor executor, int workerCount, boolean allowShutdownOnIdle) { + super(executor, workerCount, allowShutdownOnIdle); + } + + @Override + protected SctpWorker createWorker(Executor executor, boolean allowShutdownOnIdle) { + return new SctpWorker(executor, allowShutdownOnIdle); + } + +} diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/SelectorUtil.java b/transport-sctp/src/main/java/io/netty/channel/sctp/SelectorUtil.java deleted file mode 100644 index 77c4035cb5..0000000000 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/SelectorUtil.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.sctp; - -import io.netty.logging.InternalLogger; -import io.netty.logging.InternalLoggerFactory; - -import java.io.IOException; -import java.nio.channels.CancelledKeyException; -import java.nio.channels.Selector; - -final class SelectorUtil { - private static final InternalLogger logger = - InternalLoggerFactory.getInstance(SelectorUtil.class); - - static final int DEFAULT_IO_THREADS = Runtime.getRuntime().availableProcessors() * 2; - - static void select(Selector selector) throws IOException { - try { - selector.select(10); // does small timeout give more throughput + less CPU usage? - } catch (CancelledKeyException e) { - if (logger.isDebugEnabled()) { - // Harmless exception - log anyway - logger.debug( - CancelledKeyException.class.getSimpleName() + - " raised by a Selector - JDK bug?", e); - } - - } - } - - private SelectorUtil() { - // Unused - } -} diff --git a/transport-sctp/src/test/java/io/netty/testsuite/transport/sctp/SctpClientBootstrapTest.java b/transport-sctp/src/test/java/io/netty/testsuite/transport/sctp/SctpClientBootstrapTest.java index 33845d13b8..c4769b60c1 100644 --- a/transport-sctp/src/test/java/io/netty/testsuite/transport/sctp/SctpClientBootstrapTest.java +++ b/transport-sctp/src/test/java/io/netty/testsuite/transport/sctp/SctpClientBootstrapTest.java @@ -24,6 +24,6 @@ import java.util.concurrent.Executor; public class SctpClientBootstrapTest extends AbstractSocketClientBootstrapTest { @Override protected ChannelFactory newClientSocketChannelFactory(Executor executor) { - return new SctpClientSocketChannelFactory(executor, executor); + return new SctpClientSocketChannelFactory(executor); } } diff --git a/transport-sctp/src/test/java/io/netty/testsuite/transport/sctp/SctpCompatibleObjectStreamEchoTest.java b/transport-sctp/src/test/java/io/netty/testsuite/transport/sctp/SctpCompatibleObjectStreamEchoTest.java index dc38655dce..2bfb16595c 100644 --- a/transport-sctp/src/test/java/io/netty/testsuite/transport/sctp/SctpCompatibleObjectStreamEchoTest.java +++ b/transport-sctp/src/test/java/io/netty/testsuite/transport/sctp/SctpCompatibleObjectStreamEchoTest.java @@ -25,11 +25,11 @@ import java.util.concurrent.Executor; public class SctpCompatibleObjectStreamEchoTest extends AbstractSocketCompatibleObjectStreamEchoTest { @Override protected ChannelFactory newServerSocketChannelFactory(Executor executor) { - return new SctpServerSocketChannelFactory(executor, executor); + return new SctpServerSocketChannelFactory(executor); } @Override protected ChannelFactory newClientSocketChannelFactory(Executor executor) { - return new SctpClientSocketChannelFactory(executor, executor); + return new SctpClientSocketChannelFactory(executor); } } diff --git a/transport-sctp/src/test/java/io/netty/testsuite/transport/sctp/SctpEchoTest.java b/transport-sctp/src/test/java/io/netty/testsuite/transport/sctp/SctpEchoTest.java index bbe24bde2a..a5cc3cab20 100644 --- a/transport-sctp/src/test/java/io/netty/testsuite/transport/sctp/SctpEchoTest.java +++ b/transport-sctp/src/test/java/io/netty/testsuite/transport/sctp/SctpEchoTest.java @@ -25,11 +25,11 @@ import java.util.concurrent.Executor; public class SctpEchoTest extends AbstractSocketEchoTest { @Override protected ChannelFactory newServerSocketChannelFactory(Executor executor) { - return new SctpServerSocketChannelFactory(executor, executor); + return new SctpServerSocketChannelFactory(executor); } @Override protected ChannelFactory newClientSocketChannelFactory(Executor executor) { - return new SctpClientSocketChannelFactory(executor, executor); + return new SctpClientSocketChannelFactory(executor); } } diff --git a/transport-sctp/src/test/java/io/netty/testsuite/transport/sctp/SctpFixedLengthEchoTest.java b/transport-sctp/src/test/java/io/netty/testsuite/transport/sctp/SctpFixedLengthEchoTest.java index a1e590438d..44a2ea1bb4 100644 --- a/transport-sctp/src/test/java/io/netty/testsuite/transport/sctp/SctpFixedLengthEchoTest.java +++ b/transport-sctp/src/test/java/io/netty/testsuite/transport/sctp/SctpFixedLengthEchoTest.java @@ -25,11 +25,12 @@ import java.util.concurrent.Executor; public class SctpFixedLengthEchoTest extends AbstractSocketFixedLengthEchoTest { @Override protected ChannelFactory newServerSocketChannelFactory(Executor executor) { - return new SctpServerSocketChannelFactory(executor, executor); + return new SctpServerSocketChannelFactory(executor); } @Override protected ChannelFactory newClientSocketChannelFactory(Executor executor) { - return new SctpClientSocketChannelFactory(executor, executor); + return new SctpClientSocketChannelFactory(executor); } + } diff --git a/transport-sctp/src/test/java/io/netty/testsuite/transport/sctp/SctpMultiHomingEchoTest.java b/transport-sctp/src/test/java/io/netty/testsuite/transport/sctp/SctpMultiHomingEchoTest.java index f2057168d9..12e153a8ed 100644 --- a/transport-sctp/src/test/java/io/netty/testsuite/transport/sctp/SctpMultiHomingEchoTest.java +++ b/transport-sctp/src/test/java/io/netty/testsuite/transport/sctp/SctpMultiHomingEchoTest.java @@ -72,11 +72,11 @@ public class SctpMultiHomingEchoTest { } protected ChannelFactory newServerSocketChannelFactory(Executor executor) { - return new SctpServerSocketChannelFactory(executor, executor); + return new SctpServerSocketChannelFactory(executor); } protected ChannelFactory newClientSocketChannelFactory(Executor executor) { - return new SctpClientSocketChannelFactory(executor, executor); + return new SctpClientSocketChannelFactory(executor); } @Test(timeout = 15000) diff --git a/transport-sctp/src/test/java/io/netty/testsuite/transport/sctp/SctpMultiStreamingEchoTest.java b/transport-sctp/src/test/java/io/netty/testsuite/transport/sctp/SctpMultiStreamingEchoTest.java index 7f8b104db8..c724ec081c 100644 --- a/transport-sctp/src/test/java/io/netty/testsuite/transport/sctp/SctpMultiStreamingEchoTest.java +++ b/transport-sctp/src/test/java/io/netty/testsuite/transport/sctp/SctpMultiStreamingEchoTest.java @@ -72,11 +72,11 @@ public class SctpMultiStreamingEchoTest { } protected ChannelFactory newServerSocketChannelFactory(Executor executor) { - return new SctpServerSocketChannelFactory(executor, executor); + return new SctpServerSocketChannelFactory(executor); } protected ChannelFactory newClientSocketChannelFactory(Executor executor) { - return new SctpClientSocketChannelFactory(executor, executor); + return new SctpClientSocketChannelFactory(executor); } @Test(timeout = 10000) diff --git a/transport-sctp/src/test/java/io/netty/testsuite/transport/sctp/SctpObjectStreamEchoTest.java b/transport-sctp/src/test/java/io/netty/testsuite/transport/sctp/SctpObjectStreamEchoTest.java index 55c84e0684..bd3ed20428 100644 --- a/transport-sctp/src/test/java/io/netty/testsuite/transport/sctp/SctpObjectStreamEchoTest.java +++ b/transport-sctp/src/test/java/io/netty/testsuite/transport/sctp/SctpObjectStreamEchoTest.java @@ -25,11 +25,11 @@ import java.util.concurrent.Executor; public class SctpObjectStreamEchoTest extends AbstractSocketObjectStreamEchoTest { @Override protected ChannelFactory newServerSocketChannelFactory(Executor executor) { - return new SctpServerSocketChannelFactory(executor, executor); + return new SctpServerSocketChannelFactory(executor); } @Override protected ChannelFactory newClientSocketChannelFactory(Executor executor) { - return new SctpClientSocketChannelFactory(executor, executor); + return new SctpClientSocketChannelFactory(executor); } } diff --git a/transport-sctp/src/test/java/io/netty/testsuite/transport/sctp/SctpServerBootstrapTest.java b/transport-sctp/src/test/java/io/netty/testsuite/transport/sctp/SctpServerBootstrapTest.java index a20c6ae0fe..2e450c7178 100644 --- a/transport-sctp/src/test/java/io/netty/testsuite/transport/sctp/SctpServerBootstrapTest.java +++ b/transport-sctp/src/test/java/io/netty/testsuite/transport/sctp/SctpServerBootstrapTest.java @@ -24,6 +24,6 @@ import java.util.concurrent.Executor; public class SctpServerBootstrapTest extends AbstractSocketServerBootstrapTest { @Override protected ChannelFactory newServerSocketChannelFactory(Executor executor) { - return new SctpServerSocketChannelFactory(executor, executor); + return new SctpServerSocketChannelFactory(executor); } } diff --git a/transport-sctp/src/test/java/io/netty/testsuite/transport/sctp/SctpSslEchoTest.java b/transport-sctp/src/test/java/io/netty/testsuite/transport/sctp/SctpSslEchoTest.java index e840cce264..1bdfc323f7 100644 --- a/transport-sctp/src/test/java/io/netty/testsuite/transport/sctp/SctpSslEchoTest.java +++ b/transport-sctp/src/test/java/io/netty/testsuite/transport/sctp/SctpSslEchoTest.java @@ -25,11 +25,11 @@ import java.util.concurrent.Executor; public class SctpSslEchoTest extends AbstractSocketSslEchoTest { @Override protected ChannelFactory newServerSocketChannelFactory(Executor executor) { - return new SctpServerSocketChannelFactory(executor, executor); + return new SctpServerSocketChannelFactory(executor); } @Override protected ChannelFactory newClientSocketChannelFactory(Executor executor) { - return new SctpClientSocketChannelFactory(executor, executor); + return new SctpClientSocketChannelFactory(executor); } } diff --git a/transport-sctp/src/test/java/io/netty/testsuite/transport/sctp/SctpStringEchoTest.java b/transport-sctp/src/test/java/io/netty/testsuite/transport/sctp/SctpStringEchoTest.java index 31440bec6a..a0709c176d 100644 --- a/transport-sctp/src/test/java/io/netty/testsuite/transport/sctp/SctpStringEchoTest.java +++ b/transport-sctp/src/test/java/io/netty/testsuite/transport/sctp/SctpStringEchoTest.java @@ -25,11 +25,11 @@ import java.util.concurrent.Executor; public class SctpStringEchoTest extends AbstractSocketStringEchoTest { @Override protected ChannelFactory newServerSocketChannelFactory(Executor executor) { - return new SctpServerSocketChannelFactory(executor, executor); + return new SctpServerSocketChannelFactory(executor); } @Override protected ChannelFactory newClientSocketChannelFactory(Executor executor) { - return new SctpClientSocketChannelFactory(executor, executor); + return new SctpClientSocketChannelFactory(executor); } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractJdkChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractJdkChannel.java index 8629711e23..ed639a8add 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractJdkChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractJdkChannel.java @@ -25,7 +25,7 @@ public abstract class AbstractJdkChannel implements JdkChannel { final AbstractSelectableChannel channel; - AbstractJdkChannel(AbstractSelectableChannel channel) { + protected AbstractJdkChannel(AbstractSelectableChannel channel) { this.channel = channel; } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java index 03ff40a7c6..ba10b67a25 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java @@ -24,7 +24,7 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelSink; import io.netty.channel.MessageEvent; -import io.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer; +import io.netty.channel.socket.nio.SendBufferPool.SendBuffer; import io.netty.util.internal.QueueFactory; import io.netty.util.internal.ThreadLocalBoolean; @@ -38,7 +38,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -abstract class AbstractNioChannel extends AbstractChannel implements NioChannel { +public abstract class AbstractNioChannel extends AbstractChannel implements NioChannel { /** * The {@link AbstractNioWorker}. @@ -48,12 +48,12 @@ abstract class AbstractNioChannel extends AbstractChannel implements NioChannel /** * Monitor object to synchronize access to InterestedOps. */ - final Object interestOpsLock = new Object(); + protected final Object interestOpsLock = new Object(); /** * Monitor object for synchronizing access to the {@link WriteRequestQueue}. */ - final Object writeLock = new Object(); + protected final Object writeLock = new Object(); /** * WriteTask that performs write operations. @@ -68,7 +68,7 @@ abstract class AbstractNioChannel extends AbstractChannel implements NioChannel /** * Queue of write {@link MessageEvent}s. */ - final Queue writeBufferQueue = new WriteRequestQueue(); + protected final Queue writeBufferQueue = createRequestQueue(); /** * Keeps track of the number of bytes that the {@link WriteRequestQueue} currently @@ -84,14 +84,14 @@ abstract class AbstractNioChannel extends AbstractChannel implements NioChannel /** * The current write {@link MessageEvent} */ - MessageEvent currentWriteEvent; - SendBuffer currentWriteBuffer; + protected MessageEvent currentWriteEvent; + protected SendBuffer currentWriteBuffer; /** * Boolean that indicates that write operation is in progress. */ - boolean inWriteNowLoop; - boolean writeSuspended; + protected boolean inWriteNowLoop; + protected boolean writeSuspended; private volatile InetSocketAddress localAddress; @@ -213,7 +213,11 @@ abstract class AbstractNioChannel extends AbstractChannel implements NioChannel return super.setClosed(); } - private final class WriteRequestQueue implements BlockingQueue { + protected WriteRequestQueue createRequestQueue() { + return new WriteRequestQueue(); + } + + public class WriteRequestQueue implements BlockingQueue { private final ThreadLocalBoolean notifying = new ThreadLocalBoolean(); private final BlockingQueue queue; @@ -381,7 +385,7 @@ abstract class AbstractNioChannel extends AbstractChannel implements NioChannel return e; } - private int getMessageSize(MessageEvent e) { + protected int getMessageSize(MessageEvent e) { Object m = e.getMessage(); if (m instanceof ChannelBuffer) { return ((ChannelBuffer) m).readableBytes(); diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java index d0d6ccf5b7..481f8823a1 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java @@ -23,7 +23,7 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelPipeline; import io.netty.channel.MessageEvent; import io.netty.channel.socket.Worker; -import io.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer; +import io.netty.channel.socket.nio.SendBufferPool.SendBuffer; import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; import io.netty.util.internal.DeadLockProofWorker; @@ -54,7 +54,7 @@ abstract class AbstractNioWorker implements Worker { /** * Internal Netty logger. */ - private static final InternalLogger logger = InternalLoggerFactory + protected static final InternalLogger logger = InternalLoggerFactory .getInstance(AbstractNioWorker.class); private static final int CONSTRAINT_LEVEL = NioProviderMetadata.CONSTRAINT_LEVEL; @@ -104,7 +104,7 @@ abstract class AbstractNioWorker implements Worker { /** * Queue of channel registration tasks. */ - private final Queue registerTaskQueue = QueueFactory.createQueue(Runnable.class); + protected final Queue registerTaskQueue = QueueFactory.createQueue(Runnable.class); /** * Queue of WriteTasks @@ -116,7 +116,7 @@ abstract class AbstractNioWorker implements Worker { private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation - private final SocketSendBufferPool sendBufferPool = new SocketSendBufferPool(); + private final SendBufferPool sendBufferPool = new SendBufferPool(); private final boolean allowShutdownOnIdle; @@ -130,7 +130,7 @@ abstract class AbstractNioWorker implements Worker { } - public final void registerWithWorker(final Channel channel, final ChannelFuture future) { + public void registerWithWorker(final Channel channel, final ChannelFuture future) { final Selector selector = start(); try { @@ -203,7 +203,7 @@ abstract class AbstractNioWorker implements Worker { * * @return selector */ - private Selector start() { + protected final Selector start() { synchronized (startStopLock) { if (!started && selector == null) { // Open a selector if this worker didn't start yet. @@ -461,7 +461,7 @@ abstract class AbstractNioWorker implements Worker { } } - private boolean accept(SelectionKey key) { + protected boolean accept(SelectionKey key) { NioServerSocketChannel channel = (NioServerSocketChannel) key.attachment(); try { SocketChannel acceptedSocket = channel.socket.accept(); @@ -494,7 +494,7 @@ abstract class AbstractNioWorker implements Worker { } - private void processConnectTimeout(Set keys, long currentTimeNanos) { + protected void processConnectTimeout(Set keys, long currentTimeNanos) { ConnectException cause = null; for (SelectionKey k: keys) { if (!k.isValid()) { @@ -533,7 +533,7 @@ abstract class AbstractNioWorker implements Worker { } } - private void connect(SelectionKey k) { + protected void connect(SelectionKey k) { final NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment(); try { // TODO: Remove cast @@ -559,7 +559,7 @@ abstract class AbstractNioWorker implements Worker { - private void close(SelectionKey k) { + protected void close(SelectionKey k) { Object attachment = k.attachment(); if (attachment instanceof AbstractNioChannel) { AbstractNioChannel ch = (AbstractNioChannel) attachment; @@ -572,7 +572,7 @@ abstract class AbstractNioWorker implements Worker { } } - void writeFromUserCode(final AbstractNioChannel channel) { + public void writeFromUserCode(final AbstractNioChannel channel) { if (!channel.isConnected()) { cleanUpWriteBuffer(channel); @@ -595,7 +595,7 @@ abstract class AbstractNioWorker implements Worker { write0(channel); } - void writeFromTaskLoop(AbstractNioChannel ch) { + public void writeFromTaskLoop(AbstractNioChannel ch) { if (!ch.writeSuspended) { write0(ch); } @@ -636,7 +636,7 @@ abstract class AbstractNioWorker implements Worker { long writtenBytes = 0; - final SocketSendBufferPool sendBufferPool = this.sendBufferPool; + final SendBufferPool sendBufferPool = this.sendBufferPool; final WritableByteChannel ch = channel.getJdkChannel(); final Queue writeBuffer = channel.writeBufferQueue; @@ -746,7 +746,7 @@ abstract class AbstractNioWorker implements Worker { return Thread.currentThread() == thread; } - private void setOpWrite(AbstractNioChannel channel) { + protected void setOpWrite(AbstractNioChannel channel) { Selector selector = this.selector; SelectionKey key = channel.getJdkChannel().keyFor(selector); if (key == null) { @@ -769,7 +769,7 @@ abstract class AbstractNioWorker implements Worker { } } - private void clearOpWrite(AbstractNioChannel channel) { + protected void clearOpWrite(AbstractNioChannel channel) { Selector selector = this.selector; SelectionKey key = channel.getJdkChannel().keyFor(selector); if (key == null) { @@ -793,7 +793,7 @@ abstract class AbstractNioWorker implements Worker { } - void close(NioServerSocketChannel channel, ChannelFuture future) { + public void close(NioServerSocketChannel channel, ChannelFuture future) { boolean isIoThread = isIoThread(); boolean bound = channel.isBound(); @@ -842,7 +842,7 @@ abstract class AbstractNioWorker implements Worker { } } - void close(AbstractNioChannel channel, ChannelFuture future) { + public void close(AbstractNioChannel channel, ChannelFuture future) { boolean connected = channel.isConnected(); boolean bound = channel.isBound(); boolean iothread = isIoThread(); @@ -945,7 +945,7 @@ abstract class AbstractNioWorker implements Worker { } } - void setInterestOps(AbstractNioChannel channel, ChannelFuture future, int interestOps) { + public void setInterestOps(AbstractNioChannel channel, ChannelFuture future, int interestOps) { boolean changed = false; boolean iothread = isIoThread(); try { diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorkerPool.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorkerPool.java index 45a181f543..ce1e314eb9 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorkerPool.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorkerPool.java @@ -42,7 +42,7 @@ public abstract class AbstractNioWorkerPool impleme * @param allowShutdownOnIdle allow the {@link Worker}'s to shutdown when there is not {@link Channel} is registered with it * @param workerCount the count of {@link Worker}'s to create */ - AbstractNioWorkerPool(Executor workerExecutor, int workerCount, boolean allowShutDownOnIdle) { + protected AbstractNioWorkerPool(Executor workerExecutor, int workerCount, boolean allowShutDownOnIdle) { if (workerExecutor == null) { throw new NullPointerException("workerExecutor"); } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioWorker.java b/transport/src/main/java/io/netty/channel/socket/nio/NioWorker.java index 5524be990d..a6b31cc45e 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioWorker.java @@ -36,7 +36,7 @@ import java.util.concurrent.Executor; public class NioWorker extends AbstractNioWorker { - private final SocketReceiveBufferPool recvBufferPool = new SocketReceiveBufferPool(); + protected final ReceiveBufferPool recvBufferPool = new ReceiveBufferPool(); public NioWorker(Executor executor) { super(executor); diff --git a/transport/src/main/java/io/netty/channel/socket/nio/SocketReceiveBufferPool.java b/transport/src/main/java/io/netty/channel/socket/nio/ReceiveBufferPool.java similarity index 95% rename from transport/src/main/java/io/netty/channel/socket/nio/SocketReceiveBufferPool.java rename to transport/src/main/java/io/netty/channel/socket/nio/ReceiveBufferPool.java index de2b53ff83..701cd171cc 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/SocketReceiveBufferPool.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/ReceiveBufferPool.java @@ -18,14 +18,14 @@ package io.netty.channel.socket.nio; import java.lang.ref.SoftReference; import java.nio.ByteBuffer; -final class SocketReceiveBufferPool { +public final class ReceiveBufferPool { private static final int POOL_SIZE = 8; @SuppressWarnings("unchecked") private final SoftReference[] pool = new SoftReference[POOL_SIZE]; - ByteBuffer acquire(int size) { + public ByteBuffer acquire(int size) { final SoftReference[] pool = this.pool; for (int i = 0; i < POOL_SIZE; i ++) { SoftReference ref = pool[i]; @@ -53,7 +53,7 @@ final class SocketReceiveBufferPool { return buf; } - void release(ByteBuffer buffer) { + public void release(ByteBuffer buffer) { final SoftReference[] pool = this.pool; for (int i = 0; i < POOL_SIZE; i ++) { SoftReference ref = pool[i]; diff --git a/transport/src/main/java/io/netty/channel/socket/nio/SelectorUtil.java b/transport/src/main/java/io/netty/channel/socket/nio/SelectorUtil.java index 511f5dcf26..8f03731e11 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/SelectorUtil.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/SelectorUtil.java @@ -22,11 +22,11 @@ import java.nio.channels.Selector; import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; -final class SelectorUtil { +public final class SelectorUtil { private static final InternalLogger logger = InternalLoggerFactory.getInstance(SelectorUtil.class); - static final int DEFAULT_IO_THREADS = Runtime.getRuntime().availableProcessors() * 2; + public static final int DEFAULT_IO_THREADS = Runtime.getRuntime().availableProcessors() * 2; // Workaround for JDK NIO bug. // diff --git a/transport/src/main/java/io/netty/channel/socket/nio/SocketSendBufferPool.java b/transport/src/main/java/io/netty/channel/socket/nio/SendBufferPool.java similarity index 85% rename from transport/src/main/java/io/netty/channel/socket/nio/SocketSendBufferPool.java rename to transport/src/main/java/io/netty/channel/socket/nio/SendBufferPool.java index bd0ebbd3f5..c25db6aa1d 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/SocketSendBufferPool.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/SendBufferPool.java @@ -25,21 +25,22 @@ import java.nio.channels.WritableByteChannel; import io.netty.buffer.ChannelBuffer; import io.netty.channel.FileRegion; -final class SocketSendBufferPool { +public class SendBufferPool { private static final SendBuffer EMPTY_BUFFER = new EmptySendBuffer(); - private static final int DEFAULT_PREALLOCATION_SIZE = 65536; - private static final int ALIGN_SHIFT = 4; - private static final int ALIGN_MASK = 15; + public static final int DEFAULT_PREALLOCATION_SIZE = 65536; + public static final int ALIGN_SHIFT = 4; + public static final int ALIGN_MASK = 15; - PreallocationRef poolHead; - Preallocation current = new Preallocation(DEFAULT_PREALLOCATION_SIZE); + protected PreallocationRef poolHead; + protected Preallocation current = new Preallocation(DEFAULT_PREALLOCATION_SIZE); - SocketSendBufferPool() { + public SendBufferPool() { } - SendBuffer acquire(Object message) { + + public SendBuffer acquire(Object message) { if (message instanceof ChannelBuffer) { return acquire((ChannelBuffer) message); } else if (message instanceof FileRegion) { @@ -50,7 +51,7 @@ final class SocketSendBufferPool { "unsupported message type: " + message.getClass()); } - private SendBuffer acquire(FileRegion src) { + protected SendBuffer acquire(FileRegion src) { if (src.getCount() == 0) { return EMPTY_BUFFER; } @@ -103,7 +104,7 @@ final class SocketSendBufferPool { return dst; } - private Preallocation getPreallocation() { + protected Preallocation getPreallocation() { Preallocation current = this.current; if (current.refCnt == 0) { current.buffer.clear(); @@ -112,8 +113,8 @@ final class SocketSendBufferPool { return getPreallocation0(); } - - private Preallocation getPreallocation0() { + + protected Preallocation getPreallocation0() { PreallocationRef ref = poolHead; if (ref != null) { do { @@ -132,7 +133,7 @@ final class SocketSendBufferPool { return new Preallocation(DEFAULT_PREALLOCATION_SIZE); } - private static int align(int pos) { + protected static int align(int pos) { int q = pos >>> ALIGN_SHIFT; int r = pos & ALIGN_MASK; if (r != 0) { @@ -141,25 +142,25 @@ final class SocketSendBufferPool { return q << ALIGN_SHIFT; } - private static final class Preallocation { - final ByteBuffer buffer; - int refCnt; + public static final class Preallocation { + public final ByteBuffer buffer; + public int refCnt; - Preallocation(int capacity) { + public Preallocation(int capacity) { buffer = ByteBuffer.allocateDirect(capacity); } } - private final class PreallocationRef extends SoftReference { + public final class PreallocationRef extends SoftReference { final PreallocationRef next; - PreallocationRef(Preallocation prealloation, PreallocationRef next) { + public PreallocationRef(Preallocation prealloation, PreallocationRef next) { super(prealloation); this.next = next; } } - interface SendBuffer { + public interface SendBuffer { boolean finished(); long writtenBytes(); long totalBytes(); @@ -170,12 +171,12 @@ final class SocketSendBufferPool { void release(); } - static class UnpooledSendBuffer implements SendBuffer { + public class UnpooledSendBuffer implements SendBuffer { - final ByteBuffer buffer; + protected final ByteBuffer buffer; final int initialPos; - UnpooledSendBuffer(ByteBuffer buffer) { + public UnpooledSendBuffer(ByteBuffer buffer) { this.buffer = buffer; initialPos = buffer.position(); } @@ -211,13 +212,13 @@ final class SocketSendBufferPool { } } - final class PooledSendBuffer implements SendBuffer { + public class PooledSendBuffer implements SendBuffer { - private final Preallocation parent; - final ByteBuffer buffer; + protected final Preallocation parent; + public final ByteBuffer buffer; final int initialPos; - PooledSendBuffer(Preallocation parent, ByteBuffer buffer) { + public PooledSendBuffer(Preallocation parent, ByteBuffer buffer) { this.parent = parent; this.buffer = buffer; initialPos = buffer.position(); @@ -260,7 +261,7 @@ final class SocketSendBufferPool { } } - static final class FileSendBuffer implements SendBuffer { + final class FileSendBuffer implements SendBuffer { private final FileRegion file; private long writtenBytes;