CRLF -> LF / Remove trailing whitespace

This commit is contained in:
Trustin Lee 2012-06-08 11:13:57 +09:00
parent 6d647feb7e
commit 6fce4539ad
24 changed files with 2529 additions and 2529 deletions

View File

@ -385,7 +385,7 @@ public final class ChannelBuffers {
* Creates a new composite buffer which wraps the readable bytes of the
* specified buffers without copying them. A modification on the content
* of the specified buffers will be visible to the returned buffer.
*
*
* @throws IllegalArgumentException
* if the specified buffers' endianness are different from each
* other
@ -399,7 +399,7 @@ public final class ChannelBuffers {
* of the specified buffers will be visible to the returned buffer.
* If gathering is <code>true</code> then gathering writes will be used when ever
* possible.
*
*
* @throws IllegalArgumentException
* if the specified buffers' endianness are different from each
* other
@ -444,7 +444,7 @@ public final class ChannelBuffers {
}
return EMPTY_BUFFER;
}
/**
* Creates a new composite buffer which wraps the slices of the specified

View File

@ -48,13 +48,13 @@ public class CompositeChannelBuffer extends AbstractChannelBuffer {
}
/**
* Return <code>true</code> if gathering writes / reads should be used
* Return <code>true</code> if gathering writes / reads should be used
* for this {@link CompositeChannelBuffer}
*/
public boolean useGathering() {
return gathering && DetectionUtil.javaVersion() >= 7;
}
/**
* Same with {@link #slice(int, int)} except that this method returns a list.
*/
@ -140,7 +140,7 @@ public class CompositeChannelBuffer extends AbstractChannelBuffer {
private CompositeChannelBuffer(CompositeChannelBuffer buffer) {
order = buffer.order;
this.gathering = buffer.gathering;
gathering = buffer.gathering;
components = buffer.components.clone();
indices = buffer.indices.clone();
setIndex(buffer.readerIndex(), buffer.writerIndex());

View File

@ -1,58 +1,58 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.DefaultChannelFuture;
public class ChannelRunnableWrapper extends DefaultChannelFuture implements Runnable {
private final Runnable task;
private boolean started;
public ChannelRunnableWrapper(Channel channel, Runnable task) {
super(channel, true);
this.task = task;
}
public void run() {
synchronized (this) {
if (!isCancelled()) {
started = true;
} else {
return;
}
}
try {
task.run();
setSuccess();
} catch (Throwable t) {
setFailure(t);
}
}
@Override
public synchronized boolean cancel() {
if (started) {
return false;
}
return super.cancel();
}
}
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.DefaultChannelFuture;
public class ChannelRunnableWrapper extends DefaultChannelFuture implements Runnable {
private final Runnable task;
private boolean started;
public ChannelRunnableWrapper(Channel channel, Runnable task) {
super(channel, true);
this.task = task;
}
public void run() {
synchronized (this) {
if (!isCancelled()) {
started = true;
} else {
return;
}
}
try {
task.run();
setSuccess();
} catch (Throwable t) {
setFailure(t);
}
}
@Override
public synchronized boolean cancel() {
if (started) {
return false;
}
return super.cancel();
}
}

View File

@ -1,33 +1,33 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket;
/**
* A {@link Worker} is responsible to dispatch IO operations
*
*/
public interface Worker extends Runnable {
/**
* Execute the given {@link Runnable} in the IO-Thread. This may be now or
* later once the IO-Thread do some other work.
*
* @param task
* the {@link Runnable} to execute
*/
void executeInIoThread(Runnable task);
}
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket;
/**
* A {@link Worker} is responsible to dispatch IO operations
*
*/
public interface Worker extends Runnable {
/**
* Execute the given {@link Runnable} in the IO-Thread. This may be now or
* later once the IO-Thread do some other work.
*
* @param task
* the {@link Runnable} to execute
*/
void executeInIoThread(Runnable task);
}

View File

@ -1,368 +1,368 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.nio;
import static org.jboss.netty.channel.Channels.*;
import java.net.InetSocketAddress;
import java.nio.channels.SelectableChannel;
import java.nio.channels.WritableByteChannel;
import java.util.Collection;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.AbstractChannel;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer;
import org.jboss.netty.util.internal.QueueFactory;
import org.jboss.netty.util.internal.ThreadLocalBoolean;
abstract class AbstractNioChannel<C extends SelectableChannel & WritableByteChannel> extends AbstractChannel {
/**
* The {@link AbstractNioWorker}.
*/
final AbstractNioWorker worker;
/**
* Monitor object to synchronize access to InterestedOps.
*/
final Object interestOpsLock = new Object();
/**
* Monitor object for synchronizing access to the {@link WriteRequestQueue}.
*/
final Object writeLock = new Object();
/**
* WriteTask that performs write operations.
*/
final Runnable writeTask = new WriteTask();
/**
* Indicates if there is a {@link WriteTask} in the task queue.
*/
final AtomicBoolean writeTaskInTaskQueue = new AtomicBoolean();
/**
* Queue of write {@link MessageEvent}s.
*/
final Queue<MessageEvent> writeBufferQueue = new WriteRequestQueue();
/**
* Keeps track of the number of bytes that the {@link WriteRequestQueue} currently
* contains.
*/
final AtomicInteger writeBufferSize = new AtomicInteger();
/**
* Keeps track of the highWaterMark.
*/
final AtomicInteger highWaterMarkCounter = new AtomicInteger();
/**
* The current write {@link MessageEvent}
*/
MessageEvent currentWriteEvent;
SendBuffer currentWriteBuffer;
/**
* Boolean that indicates that write operation is in progress.
*/
boolean inWriteNowLoop;
boolean writeSuspended;
private volatile InetSocketAddress localAddress;
volatile InetSocketAddress remoteAddress;
final C channel;
protected AbstractNioChannel(Integer id, Channel parent, ChannelFactory factory, ChannelPipeline pipeline, ChannelSink sink, AbstractNioWorker worker, C ch) {
super(id, parent, factory, pipeline, sink);
this.worker = worker;
this.channel = ch;
}
protected AbstractNioChannel(
Channel parent, ChannelFactory factory,
ChannelPipeline pipeline, ChannelSink sink, AbstractNioWorker worker, C ch) {
super(parent, factory, pipeline, sink);
this.worker = worker;
this.channel = ch;
}
/**
* Return the {@link AbstractNioWorker} that handle the IO of the
* {@link AbstractNioChannel}
*
* @return worker
*/
public AbstractNioWorker getWorker() {
return worker;
}
public InetSocketAddress getLocalAddress() {
InetSocketAddress localAddress = this.localAddress;
if (localAddress == null) {
try {
this.localAddress = localAddress = getLocalSocketAddress();
} catch (Throwable t) {
// Sometimes fails on a closed socket in Windows.
return null;
}
}
return localAddress;
}
public InetSocketAddress getRemoteAddress() {
InetSocketAddress remoteAddress = this.remoteAddress;
if (remoteAddress == null) {
try {
this.remoteAddress = remoteAddress =
getRemoteSocketAddress();
} catch (Throwable t) {
// Sometimes fails on a closed socket in Windows.
return null;
}
}
return remoteAddress;
}
public abstract NioChannelConfig getConfig();
int getRawInterestOps() {
return super.getInterestOps();
}
void setRawInterestOpsNow(int interestOps) {
super.setInterestOpsNow(interestOps);
}
@Override
public int getInterestOps() {
if (!isOpen()) {
return Channel.OP_WRITE;
}
int interestOps = getRawInterestOps();
int writeBufferSize = this.writeBufferSize.get();
if (writeBufferSize != 0) {
if (highWaterMarkCounter.get() > 0) {
int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
if (writeBufferSize >= lowWaterMark) {
interestOps |= Channel.OP_WRITE;
} else {
interestOps &= ~Channel.OP_WRITE;
}
} else {
int highWaterMark = getConfig().getWriteBufferHighWaterMark();
if (writeBufferSize >= highWaterMark) {
interestOps |= Channel.OP_WRITE;
} else {
interestOps &= ~Channel.OP_WRITE;
}
}
} else {
interestOps &= ~Channel.OP_WRITE;
}
return interestOps;
}
@Override
protected boolean setClosed() {
return super.setClosed();
}
abstract InetSocketAddress getLocalSocketAddress() throws Exception;
abstract InetSocketAddress getRemoteSocketAddress() throws Exception;
private final class WriteRequestQueue implements BlockingQueue<MessageEvent> {
private final ThreadLocalBoolean notifying = new ThreadLocalBoolean();
private final BlockingQueue<MessageEvent> queue;
public WriteRequestQueue() {
this.queue = QueueFactory.createQueue(MessageEvent.class);
}
public MessageEvent remove() {
return queue.remove();
}
public MessageEvent element() {
return queue.element();
}
public MessageEvent peek() {
return queue.peek();
}
public int size() {
return queue.size();
}
public boolean isEmpty() {
return queue.isEmpty();
}
public Iterator<MessageEvent> iterator() {
return queue.iterator();
}
public Object[] toArray() {
return queue.toArray();
}
public <T> T[] toArray(T[] a) {
return queue.toArray(a);
}
public boolean containsAll(Collection<?> c) {
return queue.containsAll(c);
}
public boolean addAll(Collection<? extends MessageEvent> c) {
return queue.addAll(c);
}
public boolean removeAll(Collection<?> c) {
return queue.removeAll(c);
}
public boolean retainAll(Collection<?> c) {
return queue.retainAll(c);
}
public void clear() {
queue.clear();
}
public boolean add(MessageEvent e) {
return queue.add(e);
}
public void put(MessageEvent e) throws InterruptedException {
queue.put(e);
}
public boolean offer(MessageEvent e, long timeout, TimeUnit unit) throws InterruptedException {
return queue.offer(e, timeout, unit);
}
public MessageEvent take() throws InterruptedException {
return queue.take();
}
public MessageEvent poll(long timeout, TimeUnit unit) throws InterruptedException {
return queue.poll(timeout, unit);
}
public int remainingCapacity() {
return queue.remainingCapacity();
}
public boolean remove(Object o) {
return queue.remove(o);
}
public boolean contains(Object o) {
return queue.contains(o);
}
public int drainTo(Collection<? super MessageEvent> c) {
return queue.drainTo(c);
}
public int drainTo(Collection<? super MessageEvent> c, int maxElements) {
return queue.drainTo(c, maxElements);
}
public boolean offer(MessageEvent e) {
boolean success = queue.offer(e);
assert success;
int messageSize = getMessageSize(e);
int newWriteBufferSize = writeBufferSize.addAndGet(messageSize);
int highWaterMark = getConfig().getWriteBufferHighWaterMark();
if (newWriteBufferSize >= highWaterMark) {
if (newWriteBufferSize - messageSize < highWaterMark) {
highWaterMarkCounter.incrementAndGet();
if (!notifying.get()) {
notifying.set(Boolean.TRUE);
fireChannelInterestChanged(AbstractNioChannel.this);
notifying.set(Boolean.FALSE);
}
}
}
return true;
}
public MessageEvent poll() {
MessageEvent e = queue.poll();
if (e != null) {
int messageSize = getMessageSize(e);
int newWriteBufferSize = writeBufferSize.addAndGet(-messageSize);
int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
if (newWriteBufferSize == 0 || newWriteBufferSize < lowWaterMark) {
if (newWriteBufferSize + messageSize >= lowWaterMark) {
highWaterMarkCounter.decrementAndGet();
if (isConnected() && !notifying.get()) {
notifying.set(Boolean.TRUE);
fireChannelInterestChanged(AbstractNioChannel.this);
notifying.set(Boolean.FALSE);
}
}
}
}
return e;
}
private int getMessageSize(MessageEvent e) {
Object m = e.getMessage();
if (m instanceof ChannelBuffer) {
return ((ChannelBuffer) m).readableBytes();
}
return 0;
}
}
private final class WriteTask implements Runnable {
WriteTask() {
}
public void run() {
writeTaskInTaskQueue.set(false);
worker.writeFromTaskLoop(AbstractNioChannel.this);
}
}
}
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.nio;
import static org.jboss.netty.channel.Channels.*;
import java.net.InetSocketAddress;
import java.nio.channels.SelectableChannel;
import java.nio.channels.WritableByteChannel;
import java.util.Collection;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.AbstractChannel;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer;
import org.jboss.netty.util.internal.QueueFactory;
import org.jboss.netty.util.internal.ThreadLocalBoolean;
abstract class AbstractNioChannel<C extends SelectableChannel & WritableByteChannel> extends AbstractChannel {
/**
* The {@link AbstractNioWorker}.
*/
final AbstractNioWorker worker;
/**
* Monitor object to synchronize access to InterestedOps.
*/
final Object interestOpsLock = new Object();
/**
* Monitor object for synchronizing access to the {@link WriteRequestQueue}.
*/
final Object writeLock = new Object();
/**
* WriteTask that performs write operations.
*/
final Runnable writeTask = new WriteTask();
/**
* Indicates if there is a {@link WriteTask} in the task queue.
*/
final AtomicBoolean writeTaskInTaskQueue = new AtomicBoolean();
/**
* Queue of write {@link MessageEvent}s.
*/
final Queue<MessageEvent> writeBufferQueue = new WriteRequestQueue();
/**
* Keeps track of the number of bytes that the {@link WriteRequestQueue} currently
* contains.
*/
final AtomicInteger writeBufferSize = new AtomicInteger();
/**
* Keeps track of the highWaterMark.
*/
final AtomicInteger highWaterMarkCounter = new AtomicInteger();
/**
* The current write {@link MessageEvent}
*/
MessageEvent currentWriteEvent;
SendBuffer currentWriteBuffer;
/**
* Boolean that indicates that write operation is in progress.
*/
boolean inWriteNowLoop;
boolean writeSuspended;
private volatile InetSocketAddress localAddress;
volatile InetSocketAddress remoteAddress;
final C channel;
protected AbstractNioChannel(Integer id, Channel parent, ChannelFactory factory, ChannelPipeline pipeline, ChannelSink sink, AbstractNioWorker worker, C ch) {
super(id, parent, factory, pipeline, sink);
this.worker = worker;
this.channel = ch;
}
protected AbstractNioChannel(
Channel parent, ChannelFactory factory,
ChannelPipeline pipeline, ChannelSink sink, AbstractNioWorker worker, C ch) {
super(parent, factory, pipeline, sink);
this.worker = worker;
this.channel = ch;
}
/**
* Return the {@link AbstractNioWorker} that handle the IO of the
* {@link AbstractNioChannel}
*
* @return worker
*/
public AbstractNioWorker getWorker() {
return worker;
}
public InetSocketAddress getLocalAddress() {
InetSocketAddress localAddress = this.localAddress;
if (localAddress == null) {
try {
this.localAddress = localAddress = getLocalSocketAddress();
} catch (Throwable t) {
// Sometimes fails on a closed socket in Windows.
return null;
}
}
return localAddress;
}
public InetSocketAddress getRemoteAddress() {
InetSocketAddress remoteAddress = this.remoteAddress;
if (remoteAddress == null) {
try {
this.remoteAddress = remoteAddress =
getRemoteSocketAddress();
} catch (Throwable t) {
// Sometimes fails on a closed socket in Windows.
return null;
}
}
return remoteAddress;
}
public abstract NioChannelConfig getConfig();
int getRawInterestOps() {
return super.getInterestOps();
}
void setRawInterestOpsNow(int interestOps) {
super.setInterestOpsNow(interestOps);
}
@Override
public int getInterestOps() {
if (!isOpen()) {
return Channel.OP_WRITE;
}
int interestOps = getRawInterestOps();
int writeBufferSize = this.writeBufferSize.get();
if (writeBufferSize != 0) {
if (highWaterMarkCounter.get() > 0) {
int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
if (writeBufferSize >= lowWaterMark) {
interestOps |= Channel.OP_WRITE;
} else {
interestOps &= ~Channel.OP_WRITE;
}
} else {
int highWaterMark = getConfig().getWriteBufferHighWaterMark();
if (writeBufferSize >= highWaterMark) {
interestOps |= Channel.OP_WRITE;
} else {
interestOps &= ~Channel.OP_WRITE;
}
}
} else {
interestOps &= ~Channel.OP_WRITE;
}
return interestOps;
}
@Override
protected boolean setClosed() {
return super.setClosed();
}
abstract InetSocketAddress getLocalSocketAddress() throws Exception;
abstract InetSocketAddress getRemoteSocketAddress() throws Exception;
private final class WriteRequestQueue implements BlockingQueue<MessageEvent> {
private final ThreadLocalBoolean notifying = new ThreadLocalBoolean();
private final BlockingQueue<MessageEvent> queue;
public WriteRequestQueue() {
this.queue = QueueFactory.createQueue(MessageEvent.class);
}
public MessageEvent remove() {
return queue.remove();
}
public MessageEvent element() {
return queue.element();
}
public MessageEvent peek() {
return queue.peek();
}
public int size() {
return queue.size();
}
public boolean isEmpty() {
return queue.isEmpty();
}
public Iterator<MessageEvent> iterator() {
return queue.iterator();
}
public Object[] toArray() {
return queue.toArray();
}
public <T> T[] toArray(T[] a) {
return queue.toArray(a);
}
public boolean containsAll(Collection<?> c) {
return queue.containsAll(c);
}
public boolean addAll(Collection<? extends MessageEvent> c) {
return queue.addAll(c);
}
public boolean removeAll(Collection<?> c) {
return queue.removeAll(c);
}
public boolean retainAll(Collection<?> c) {
return queue.retainAll(c);
}
public void clear() {
queue.clear();
}
public boolean add(MessageEvent e) {
return queue.add(e);
}
public void put(MessageEvent e) throws InterruptedException {
queue.put(e);
}
public boolean offer(MessageEvent e, long timeout, TimeUnit unit) throws InterruptedException {
return queue.offer(e, timeout, unit);
}
public MessageEvent take() throws InterruptedException {
return queue.take();
}
public MessageEvent poll(long timeout, TimeUnit unit) throws InterruptedException {
return queue.poll(timeout, unit);
}
public int remainingCapacity() {
return queue.remainingCapacity();
}
public boolean remove(Object o) {
return queue.remove(o);
}
public boolean contains(Object o) {
return queue.contains(o);
}
public int drainTo(Collection<? super MessageEvent> c) {
return queue.drainTo(c);
}
public int drainTo(Collection<? super MessageEvent> c, int maxElements) {
return queue.drainTo(c, maxElements);
}
public boolean offer(MessageEvent e) {
boolean success = queue.offer(e);
assert success;
int messageSize = getMessageSize(e);
int newWriteBufferSize = writeBufferSize.addAndGet(messageSize);
int highWaterMark = getConfig().getWriteBufferHighWaterMark();
if (newWriteBufferSize >= highWaterMark) {
if (newWriteBufferSize - messageSize < highWaterMark) {
highWaterMarkCounter.incrementAndGet();
if (!notifying.get()) {
notifying.set(Boolean.TRUE);
fireChannelInterestChanged(AbstractNioChannel.this);
notifying.set(Boolean.FALSE);
}
}
}
return true;
}
public MessageEvent poll() {
MessageEvent e = queue.poll();
if (e != null) {
int messageSize = getMessageSize(e);
int newWriteBufferSize = writeBufferSize.addAndGet(-messageSize);
int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
if (newWriteBufferSize == 0 || newWriteBufferSize < lowWaterMark) {
if (newWriteBufferSize + messageSize >= lowWaterMark) {
highWaterMarkCounter.decrementAndGet();
if (isConnected() && !notifying.get()) {
notifying.set(Boolean.TRUE);
fireChannelInterestChanged(AbstractNioChannel.this);
notifying.set(Boolean.FALSE);
}
}
}
}
return e;
}
private int getMessageSize(MessageEvent e) {
Object m = e.getMessage();
if (m instanceof ChannelBuffer) {
return ((ChannelBuffer) m).readableBytes();
}
return 0;
}
}
private final class WriteTask implements Runnable {
WriteTask() {
}
public void run() {
writeTaskInTaskQueue.set(false);
worker.writeFromTaskLoop(AbstractNioChannel.this);
}
}
}

View File

@ -1,52 +1,52 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.nio;
import org.jboss.netty.channel.AbstractChannelSink;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.socket.ChannelRunnableWrapper;
public abstract class AbstractNioChannelSink extends AbstractChannelSink {
@Override
public ChannelFuture execute(ChannelPipeline pipeline, final Runnable task) {
Channel ch = pipeline.getChannel();
if (ch instanceof AbstractNioChannel<?>) {
AbstractNioChannel<?> channel = (AbstractNioChannel<?>) ch;
ChannelRunnableWrapper wrapper = new ChannelRunnableWrapper(pipeline.getChannel(), task);
channel.worker.executeInIoThread(wrapper);
return wrapper;
}
return super.execute(pipeline, task);
}
@Override
protected boolean isFireExceptionCaughtLater(ChannelEvent event, Throwable actualCause) {
Channel channel = event.getChannel();
boolean fireLater = false;
if (channel instanceof AbstractNioChannel<?>) {
fireLater = !AbstractNioWorker.isIoThread((AbstractNioChannel<?>) channel);
}
return fireLater;
}
}
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.nio;
import org.jboss.netty.channel.AbstractChannelSink;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.socket.ChannelRunnableWrapper;
public abstract class AbstractNioChannelSink extends AbstractChannelSink {
@Override
public ChannelFuture execute(ChannelPipeline pipeline, final Runnable task) {
Channel ch = pipeline.getChannel();
if (ch instanceof AbstractNioChannel<?>) {
AbstractNioChannel<?> channel = (AbstractNioChannel<?>) ch;
ChannelRunnableWrapper wrapper = new ChannelRunnableWrapper(pipeline.getChannel(), task);
channel.worker.executeInIoThread(wrapper);
return wrapper;
}
return super.execute(pipeline, task);
}
@Override
protected boolean isFireExceptionCaughtLater(ChannelEvent event, Throwable actualCause) {
Channel channel = event.getChannel();
boolean fireLater = false;
if (channel instanceof AbstractNioChannel<?>) {
fireLater = !AbstractNioWorker.isIoThread((AbstractNioChannel<?>) channel);
}
return fireLater;
}
}

View File

@ -1,83 +1,83 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.nio;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.socket.Worker;
import org.jboss.netty.util.ExternalResourceReleasable;
import org.jboss.netty.util.internal.ExecutorUtil;
/**
* Abstract base class for {@link WorkerPool} implementations that create the {@link Worker}'s up-front and return them in a "fair" fashion when calling
* {@link #nextWorker()}
*
*/
public abstract class AbstractNioWorkerPool<E extends AbstractNioWorker> implements WorkerPool<E> , ExternalResourceReleasable {
private final AbstractNioWorker[] workers;
private final AtomicInteger workerIndex = new AtomicInteger();
private final Executor workerExecutor;
/**
* Create a new instance
*
* @param workerExecutor the {@link Executor} to use for the {@link Worker}'s
* @param allowShutdownOnIdle allow the {@link Worker}'s to shutdown when there is not {@link Channel} is registered with it
* @param workerCount the count of {@link Worker}'s to create
*/
AbstractNioWorkerPool(Executor workerExecutor, int workerCount, boolean allowShutDownOnIdle) {
if (workerExecutor == null) {
throw new NullPointerException("workerExecutor");
}
if (workerCount <= 0) {
throw new IllegalArgumentException(
"workerCount (" + workerCount + ") " +
"must be a positive integer.");
}
workers = new AbstractNioWorker[workerCount];
for (int i = 0; i < workers.length; i++) {
workers[i] = createWorker(workerExecutor, allowShutDownOnIdle);
}
this.workerExecutor = workerExecutor;
}
/**
* Create a new {@link Worker} which uses the given {@link Executor} to service IO
*
*
* @param executor the {@link Executor} to use
* @param allowShutdownOnIdle allow the {@link Worker} to shutdown when there is not {@link Channel} is registered with it
* @return worker the new {@link Worker}
*/
protected abstract E createWorker(Executor executor, boolean allowShutdownOnIdle);
@SuppressWarnings("unchecked")
public E nextWorker() {
return (E) workers[Math.abs(workerIndex.getAndIncrement() % workers.length)];
}
public void releaseExternalResources() {
ExecutorUtil.terminate(workerExecutor);
}
}
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.nio;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.socket.Worker;
import org.jboss.netty.util.ExternalResourceReleasable;
import org.jboss.netty.util.internal.ExecutorUtil;
/**
* Abstract base class for {@link WorkerPool} implementations that create the {@link Worker}'s up-front and return them in a "fair" fashion when calling
* {@link #nextWorker()}
*
*/
public abstract class AbstractNioWorkerPool<E extends AbstractNioWorker> implements WorkerPool<E> , ExternalResourceReleasable {
private final AbstractNioWorker[] workers;
private final AtomicInteger workerIndex = new AtomicInteger();
private final Executor workerExecutor;
/**
* Create a new instance
*
* @param workerExecutor the {@link Executor} to use for the {@link Worker}'s
* @param allowShutdownOnIdle allow the {@link Worker}'s to shutdown when there is not {@link Channel} is registered with it
* @param workerCount the count of {@link Worker}'s to create
*/
AbstractNioWorkerPool(Executor workerExecutor, int workerCount, boolean allowShutDownOnIdle) {
if (workerExecutor == null) {
throw new NullPointerException("workerExecutor");
}
if (workerCount <= 0) {
throw new IllegalArgumentException(
"workerCount (" + workerCount + ") " +
"must be a positive integer.");
}
workers = new AbstractNioWorker[workerCount];
for (int i = 0; i < workers.length; i++) {
workers[i] = createWorker(workerExecutor, allowShutDownOnIdle);
}
this.workerExecutor = workerExecutor;
}
/**
* Create a new {@link Worker} which uses the given {@link Executor} to service IO
*
*
* @param executor the {@link Executor} to use
* @param allowShutdownOnIdle allow the {@link Worker} to shutdown when there is not {@link Channel} is registered with it
* @return worker the new {@link Worker}
*/
protected abstract E createWorker(Executor executor, boolean allowShutdownOnIdle);
@SuppressWarnings("unchecked")
public E nextWorker() {
return (E) workers[Math.abs(workerIndex.getAndIncrement() % workers.length)];
}
public void releaseExternalResources() {
ExecutorUtil.terminate(workerExecutor);
}
}

View File

@ -1,82 +1,82 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.nio;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelConfig;
/**
* Special {@link ChannelConfig} sub-type which offers extra methods which are useful for NIO.
*
*/
public interface NioChannelConfig extends ChannelConfig {
/**
* Returns the high water mark of the write buffer. If the number of bytes
* queued in the write buffer exceeds this value, {@link Channel#isWritable()}
* will start to return {@code true}.
*/
int getWriteBufferHighWaterMark();
/**
* Sets the high water mark of the write buffer. If the number of bytes
* queued in the write buffer exceeds this value, {@link Channel#isWritable()}
* will start to return {@code true}.
*/
void setWriteBufferHighWaterMark(int writeBufferHighWaterMark);
/**
* Returns the low water mark of the write buffer. Once the number of bytes
* queued in the write buffer exceeded the
* {@linkplain #setWriteBufferHighWaterMark(int) high water mark} and then
* dropped down below this value, {@link Channel#isWritable()} will return
* {@code false} again.
*/
int getWriteBufferLowWaterMark();
/**
* Sets the low water mark of the write buffer. Once the number of bytes
* queued in the write buffer exceeded the
* {@linkplain #setWriteBufferHighWaterMark(int) high water mark} and then
* dropped down below this value, {@link Channel#isWritable()} will return
* {@code false} again.
*/
void setWriteBufferLowWaterMark(int writeBufferLowWaterMark);
/**
* Returns the maximum loop count for a write operation until
* {@link WritableByteChannel#write(ByteBuffer)} returns a non-zero value.
* It is similar to what a spin lock is used for in concurrency programming.
* It improves memory utilization and write throughput depending on
* the platform that JVM runs on. The default value is {@code 16}.
*/
int getWriteSpinCount();
/**
* Sets the maximum loop count for a write operation until
* {@link WritableByteChannel#write(ByteBuffer)} returns a non-zero value.
* It is similar to what a spin lock is used for in concurrency programming.
* It improves memory utilization and write throughput depending on
* the platform that JVM runs on. The default value is {@code 16}.
*
* @throws IllegalArgumentException
* if the specified value is {@code 0} or less than {@code 0}
*/
void setWriteSpinCount(int writeSpinCount);
}
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.nio;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelConfig;
/**
* Special {@link ChannelConfig} sub-type which offers extra methods which are useful for NIO.
*
*/
public interface NioChannelConfig extends ChannelConfig {
/**
* Returns the high water mark of the write buffer. If the number of bytes
* queued in the write buffer exceeds this value, {@link Channel#isWritable()}
* will start to return {@code true}.
*/
int getWriteBufferHighWaterMark();
/**
* Sets the high water mark of the write buffer. If the number of bytes
* queued in the write buffer exceeds this value, {@link Channel#isWritable()}
* will start to return {@code true}.
*/
void setWriteBufferHighWaterMark(int writeBufferHighWaterMark);
/**
* Returns the low water mark of the write buffer. Once the number of bytes
* queued in the write buffer exceeded the
* {@linkplain #setWriteBufferHighWaterMark(int) high water mark} and then
* dropped down below this value, {@link Channel#isWritable()} will return
* {@code false} again.
*/
int getWriteBufferLowWaterMark();
/**
* Sets the low water mark of the write buffer. Once the number of bytes
* queued in the write buffer exceeded the
* {@linkplain #setWriteBufferHighWaterMark(int) high water mark} and then
* dropped down below this value, {@link Channel#isWritable()} will return
* {@code false} again.
*/
void setWriteBufferLowWaterMark(int writeBufferLowWaterMark);
/**
* Returns the maximum loop count for a write operation until
* {@link WritableByteChannel#write(ByteBuffer)} returns a non-zero value.
* It is similar to what a spin lock is used for in concurrency programming.
* It improves memory utilization and write throughput depending on
* the platform that JVM runs on. The default value is {@code 16}.
*/
int getWriteSpinCount();
/**
* Sets the maximum loop count for a write operation until
* {@link WritableByteChannel#write(ByteBuffer)} returns a non-zero value.
* It is similar to what a spin lock is used for in concurrency programming.
* It improves memory utilization and write throughput depending on
* the platform that JVM runs on. The default value is {@code 16}.
*
* @throws IllegalArgumentException
* if the specified value is {@code 0} or less than {@code 0}
*/
void setWriteSpinCount(int writeSpinCount);
}

View File

@ -1,37 +1,37 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.nio;
import java.util.concurrent.Executor;
/**
* Default implementation which hands of {@link NioDatagramWorker}'s
*
*
*/
public class NioDatagramWorkerPool extends AbstractNioWorkerPool<NioDatagramWorker> {
public NioDatagramWorkerPool(Executor executor, int workerCount, boolean allowShutdownOnIdle) {
super(executor, workerCount, allowShutdownOnIdle);
}
@Override
protected NioDatagramWorker createWorker(Executor executor, boolean allowShutdownOnIdle) {
return new NioDatagramWorker(executor, allowShutdownOnIdle);
}
}
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.nio;
import java.util.concurrent.Executor;
/**
* Default implementation which hands of {@link NioDatagramWorker}'s
*
*
*/
public class NioDatagramWorkerPool extends AbstractNioWorkerPool<NioDatagramWorker> {
public NioDatagramWorkerPool(Executor executor, int workerCount, boolean allowShutdownOnIdle) {
super(executor, workerCount, allowShutdownOnIdle);
}
@Override
protected NioDatagramWorker createWorker(Executor executor, boolean allowShutdownOnIdle) {
return new NioDatagramWorker(executor, allowShutdownOnIdle);
}
}

View File

@ -1,37 +1,37 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.nio;
import java.util.concurrent.Executor;
/**
* Default implementation which hands of {@link NioWorker}'s
*
*
*/
public class NioWorkerPool extends AbstractNioWorkerPool<NioWorker> {
public NioWorkerPool(Executor executor, int workerCount, boolean allowShutdownOnIdle) {
super(executor, workerCount, allowShutdownOnIdle);
}
@Override
protected NioWorker createWorker(Executor executor, boolean allowShutdownOnIdle) {
return new NioWorker(executor, allowShutdownOnIdle);
}
}
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.nio;
import java.util.concurrent.Executor;
/**
* Default implementation which hands of {@link NioWorker}'s
*
*
*/
public class NioWorkerPool extends AbstractNioWorkerPool<NioWorker> {
public NioWorkerPool(Executor executor, int workerCount, boolean allowShutdownOnIdle) {
super(executor, workerCount, allowShutdownOnIdle);
}
@Override
protected NioWorker createWorker(Executor executor, boolean allowShutdownOnIdle) {
return new NioWorker(executor, allowShutdownOnIdle);
}
}

View File

@ -1,47 +1,47 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.nio;
import org.jboss.netty.channel.socket.Worker;
import org.jboss.netty.util.ExternalResourceReleasable;
/**
* This implementation of a {@link WorkerPool} should be used if you plan to share a {@link WorkerPool} between different Factories. You will need to call {@link #destroy()} by your own once
* you want to release any resources of it.
*
*
*/
public final class ShareableWorkerPool<E extends Worker> implements WorkerPool<E> {
private final WorkerPool<E> wrapped;
public ShareableWorkerPool(WorkerPool<E> wrapped) {
this.wrapped = wrapped;
}
public E nextWorker() {
return wrapped.nextWorker();
}
/**
* Destroy the {@link ShareableWorkerPool} and release all resources. After this is called its not usable anymore
*/
public void destroy() {
if (wrapped instanceof ExternalResourceReleasable) {
((ExternalResourceReleasable) wrapped).releaseExternalResources();
}
}
}
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.nio;
import org.jboss.netty.channel.socket.Worker;
import org.jboss.netty.util.ExternalResourceReleasable;
/**
* This implementation of a {@link WorkerPool} should be used if you plan to share a {@link WorkerPool} between different Factories. You will need to call {@link #destroy()} by your own once
* you want to release any resources of it.
*
*
*/
public final class ShareableWorkerPool<E extends Worker> implements WorkerPool<E> {
private final WorkerPool<E> wrapped;
public ShareableWorkerPool(WorkerPool<E> wrapped) {
this.wrapped = wrapped;
}
public E nextWorker() {
return wrapped.nextWorker();
}
/**
* Destroy the {@link ShareableWorkerPool} and release all resources. After this is called its not usable anymore
*/
public void destroy() {
if (wrapped instanceof ExternalResourceReleasable) {
((ExternalResourceReleasable) wrapped).releaseExternalResources();
}
}
}

View File

@ -1,35 +1,35 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.nio;
import org.jboss.netty.channel.socket.Worker;
/**
* The {@link WorkerPool} is responsible to hand of {@link Worker}'s on demand
*
*/
public interface WorkerPool<E extends Worker> {
/**
* Return the next {@link Worker} to use
*
* @return worker
*/
E nextWorker();
}
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.nio;
import org.jboss.netty.channel.socket.Worker;
/**
* The {@link WorkerPool} is responsible to hand of {@link Worker}'s on demand
*
*/
public interface WorkerPool<E extends Worker> {
/**
* Return the next {@link Worker} to use
*
* @return worker
*/
E nextWorker();
}

View File

@ -1,117 +1,117 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.oio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import org.jboss.netty.channel.AbstractChannel;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink;
import org.jboss.netty.channel.socket.Worker;
abstract class AbstractOioChannel extends AbstractChannel {
private volatile InetSocketAddress localAddress;
volatile InetSocketAddress remoteAddress;
volatile Thread workerThread;
volatile Worker worker;
final Object interestOpsLock = new Object();
AbstractOioChannel(
Channel parent,
ChannelFactory factory,
ChannelPipeline pipeline,
ChannelSink sink) {
super(parent, factory, pipeline, sink);
}
@Override
protected boolean setClosed() {
return super.setClosed();
}
@Override
protected void setInterestOpsNow(int interestOps) {
super.setInterestOpsNow(interestOps);
}
@Override
public ChannelFuture write(Object message, SocketAddress remoteAddress) {
if (remoteAddress == null || remoteAddress.equals(getRemoteAddress())) {
return super.write(message, null);
} else {
return super.write(message, remoteAddress);
}
}
public boolean isBound() {
return isOpen() && isSocketBound();
}
public boolean isConnected() {
return isOpen() && isSocketConnected();
}
public InetSocketAddress getLocalAddress() {
InetSocketAddress localAddress = this.localAddress;
if (localAddress == null) {
try {
this.localAddress = localAddress = getLocalSocketAddress();
} catch (Throwable t) {
// Sometimes fails on a closed socket in Windows.
return null;
}
}
return localAddress;
}
public InetSocketAddress getRemoteAddress() {
InetSocketAddress remoteAddress = this.remoteAddress;
if (remoteAddress == null) {
try {
this.remoteAddress = remoteAddress =
getRemoteSocketAddress();
} catch (Throwable t) {
// Sometimes fails on a closed socket in Windows.
return null;
}
}
return remoteAddress;
}
abstract boolean isSocketBound();
abstract boolean isSocketConnected();
abstract boolean isSocketClosed();
abstract InetSocketAddress getLocalSocketAddress() throws Exception;
abstract InetSocketAddress getRemoteSocketAddress() throws Exception;
abstract void closeSocket() throws IOException;
}
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.oio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import org.jboss.netty.channel.AbstractChannel;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink;
import org.jboss.netty.channel.socket.Worker;
abstract class AbstractOioChannel extends AbstractChannel {
private volatile InetSocketAddress localAddress;
volatile InetSocketAddress remoteAddress;
volatile Thread workerThread;
volatile Worker worker;
final Object interestOpsLock = new Object();
AbstractOioChannel(
Channel parent,
ChannelFactory factory,
ChannelPipeline pipeline,
ChannelSink sink) {
super(parent, factory, pipeline, sink);
}
@Override
protected boolean setClosed() {
return super.setClosed();
}
@Override
protected void setInterestOpsNow(int interestOps) {
super.setInterestOpsNow(interestOps);
}
@Override
public ChannelFuture write(Object message, SocketAddress remoteAddress) {
if (remoteAddress == null || remoteAddress.equals(getRemoteAddress())) {
return super.write(message, null);
} else {
return super.write(message, remoteAddress);
}
}
public boolean isBound() {
return isOpen() && isSocketBound();
}
public boolean isConnected() {
return isOpen() && isSocketConnected();
}
public InetSocketAddress getLocalAddress() {
InetSocketAddress localAddress = this.localAddress;
if (localAddress == null) {
try {
this.localAddress = localAddress = getLocalSocketAddress();
} catch (Throwable t) {
// Sometimes fails on a closed socket in Windows.
return null;
}
}
return localAddress;
}
public InetSocketAddress getRemoteAddress() {
InetSocketAddress remoteAddress = this.remoteAddress;
if (remoteAddress == null) {
try {
this.remoteAddress = remoteAddress =
getRemoteSocketAddress();
} catch (Throwable t) {
// Sometimes fails on a closed socket in Windows.
return null;
}
}
return remoteAddress;
}
abstract boolean isSocketBound();
abstract boolean isSocketConnected();
abstract boolean isSocketClosed();
abstract InetSocketAddress getLocalSocketAddress() throws Exception;
abstract InetSocketAddress getRemoteSocketAddress() throws Exception;
abstract void closeSocket() throws IOException;
}

View File

@ -1,57 +1,57 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.oio;
import org.jboss.netty.channel.AbstractChannelSink;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.socket.ChannelRunnableWrapper;
import org.jboss.netty.channel.socket.Worker;
public abstract class AbstractOioChannelSink extends AbstractChannelSink {
@Override
public ChannelFuture execute(final ChannelPipeline pipeline, final Runnable task) {
Channel ch = pipeline.getChannel();
if (ch instanceof AbstractOioChannel) {
AbstractOioChannel channel = (AbstractOioChannel) ch;
Worker worker = channel.worker;
if (worker != null) {
ChannelRunnableWrapper wrapper = new ChannelRunnableWrapper(pipeline.getChannel(), task);
channel.worker.executeInIoThread(wrapper);
return wrapper;
}
}
return super.execute(pipeline, task);
}
@Override
protected boolean isFireExceptionCaughtLater(ChannelEvent event, Throwable actualCause) {
Channel channel = event.getChannel();
boolean fireLater = false;
if (channel instanceof AbstractOioChannel) {
fireLater = !AbstractOioWorker.isIoThread((AbstractOioChannel) channel);
}
return fireLater;
}
}
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.oio;
import org.jboss.netty.channel.AbstractChannelSink;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.socket.ChannelRunnableWrapper;
import org.jboss.netty.channel.socket.Worker;
public abstract class AbstractOioChannelSink extends AbstractChannelSink {
@Override
public ChannelFuture execute(final ChannelPipeline pipeline, final Runnable task) {
Channel ch = pipeline.getChannel();
if (ch instanceof AbstractOioChannel) {
AbstractOioChannel channel = (AbstractOioChannel) ch;
Worker worker = channel.worker;
if (worker != null) {
ChannelRunnableWrapper wrapper = new ChannelRunnableWrapper(pipeline.getChannel(), task);
channel.worker.executeInIoThread(wrapper);
return wrapper;
}
}
return super.execute(pipeline, task);
}
@Override
protected boolean isFireExceptionCaughtLater(ChannelEvent event, Throwable actualCause) {
Channel channel = event.getChannel();
boolean fireLater = false;
if (channel instanceof AbstractOioChannel) {
fireLater = !AbstractOioWorker.isIoThread((AbstractOioChannel) channel);
}
return fireLater;
}
}

View File

@ -1,241 +1,241 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.oio;
import static org.jboss.netty.channel.Channels.*;
import java.io.IOException;
import java.util.Queue;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.Worker;
import org.jboss.netty.util.internal.QueueFactory;
/**
* Abstract base class for Oio-Worker implementations
*
* @param <C> {@link AbstractOioChannel}
*/
abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker {
private final Queue<Runnable> eventQueue = QueueFactory.createQueue(Runnable.class);
protected final C channel;
/**
* If this worker has been started thread will be a reference to the thread
* used when starting. i.e. the current thread when the run method is executed.
*/
protected volatile Thread thread;
private volatile boolean done;
public AbstractOioWorker(C channel) {
this.channel = channel;
channel.worker = this;
}
public void run() {
thread = channel.workerThread = Thread.currentThread();
while (channel.isOpen()) {
synchronized (channel.interestOpsLock) {
while (!channel.isReadable()) {
try {
// notify() is not called at all.
// close() and setInterestOps() calls Thread.interrupt()
channel.interestOpsLock.wait();
} catch (InterruptedException e) {
if (!channel.isOpen()) {
break;
}
}
}
}
boolean cont = false;
try {
cont = process();
} catch (Throwable t) {
if (!channel.isSocketClosed()) {
fireExceptionCaught(channel, t);
}
} finally {
processEventQueue();
if (!cont) {
break;
}
}
}
// Setting the workerThread to null will prevent any channel
// operations from interrupting this thread from now on.
channel.workerThread = null;
// Clean up.
close(channel, succeededFuture(channel), true);
// Mark the worker event loop as done so we know that we need to run tasks directly and not queue them
// See #287
done = true;
// just to make we don't have something left
processEventQueue();
}
static boolean isIoThread(AbstractOioChannel channel) {
return Thread.currentThread() == channel.workerThread;
}
public void executeInIoThread(Runnable task) {
// check if the current thread is the worker thread
//
// Also check if the event loop of the worker is complete. If so we need to run the task now.
// See #287
if (Thread.currentThread() == thread || done) {
task.run();
} else {
boolean added = eventQueue.offer(task);
if (added) {
// as we set the SO_TIMEOUT to 1 second this task will get picked up in 1 second at latest
}
}
}
private void processEventQueue() {
for (;;) {
final Runnable task = eventQueue.poll();
if (task == null) {
break;
}
task.run();
}
}
/**
* Process the incoming messages and also is responsible for call {@link Channels#fireMessageReceived(Channel, Object)} once a message
* was processed without errors.
*
* @return continue returns <code>true</code> as long as this worker should continue to try processing incoming messages
* @throws IOException
*/
abstract boolean process() throws IOException;
static void setInterestOps(
AbstractOioChannel channel, ChannelFuture future, int interestOps) {
boolean iothread = isIoThread(channel);
// Override OP_WRITE flag - a user cannot change this flag.
interestOps &= ~Channel.OP_WRITE;
interestOps |= channel.getInterestOps() & Channel.OP_WRITE;
boolean changed = false;
try {
if (channel.getInterestOps() != interestOps) {
if ((interestOps & Channel.OP_READ) != 0) {
channel.setInterestOpsNow(Channel.OP_READ);
} else {
channel.setInterestOpsNow(Channel.OP_NONE);
}
changed = true;
}
future.setSuccess();
if (changed) {
synchronized (channel.interestOpsLock) {
channel.setInterestOpsNow(interestOps);
// Notify the worker so it stops or continues reading.
Thread currentThread = Thread.currentThread();
Thread workerThread = channel.workerThread;
if (workerThread != null && currentThread != workerThread) {
workerThread.interrupt();
}
}
if (iothread) {
fireChannelInterestChanged(channel);
} else {
fireChannelInterestChangedLater(channel);
}
}
} catch (Throwable t) {
future.setFailure(t);
if (iothread) {
fireExceptionCaught(channel, t);
} else {
fireExceptionCaughtLater(channel, t);
}
}
}
static void close(AbstractOioChannel channel, ChannelFuture future) {
close(channel, future, isIoThread(channel));
}
private static void close(AbstractOioChannel channel, ChannelFuture future, boolean iothread) {
boolean connected = channel.isConnected();
boolean bound = channel.isBound();
try {
channel.closeSocket();
if (channel.setClosed()) {
future.setSuccess();
if (connected) {
// Notify the worker so it stops reading.
Thread currentThread = Thread.currentThread();
Thread workerThread = channel.workerThread;
if (workerThread != null && currentThread != workerThread) {
workerThread.interrupt();
}
if (iothread) {
fireChannelDisconnected(channel);
} else {
fireChannelDisconnectedLater(channel);
}
}
if (bound) {
if (iothread) {
fireChannelUnbound(channel);
} else {
fireChannelUnboundLater(channel);
}
}
if (iothread) {
fireChannelClosed(channel);
} else {
fireChannelClosedLater(channel);
}
} else {
future.setSuccess();
}
} catch (Throwable t) {
future.setFailure(t);
if (iothread) {
fireExceptionCaught(channel, t);
} else {
fireExceptionCaughtLater(channel, t);
}
}
}
}
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.oio;
import static org.jboss.netty.channel.Channels.*;
import java.io.IOException;
import java.util.Queue;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.Worker;
import org.jboss.netty.util.internal.QueueFactory;
/**
* Abstract base class for Oio-Worker implementations
*
* @param <C> {@link AbstractOioChannel}
*/
abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker {
private final Queue<Runnable> eventQueue = QueueFactory.createQueue(Runnable.class);
protected final C channel;
/**
* If this worker has been started thread will be a reference to the thread
* used when starting. i.e. the current thread when the run method is executed.
*/
protected volatile Thread thread;
private volatile boolean done;
public AbstractOioWorker(C channel) {
this.channel = channel;
channel.worker = this;
}
public void run() {
thread = channel.workerThread = Thread.currentThread();
while (channel.isOpen()) {
synchronized (channel.interestOpsLock) {
while (!channel.isReadable()) {
try {
// notify() is not called at all.
// close() and setInterestOps() calls Thread.interrupt()
channel.interestOpsLock.wait();
} catch (InterruptedException e) {
if (!channel.isOpen()) {
break;
}
}
}
}
boolean cont = false;
try {
cont = process();
} catch (Throwable t) {
if (!channel.isSocketClosed()) {
fireExceptionCaught(channel, t);
}
} finally {
processEventQueue();
if (!cont) {
break;
}
}
}
// Setting the workerThread to null will prevent any channel
// operations from interrupting this thread from now on.
channel.workerThread = null;
// Clean up.
close(channel, succeededFuture(channel), true);
// Mark the worker event loop as done so we know that we need to run tasks directly and not queue them
// See #287
done = true;
// just to make we don't have something left
processEventQueue();
}
static boolean isIoThread(AbstractOioChannel channel) {
return Thread.currentThread() == channel.workerThread;
}
public void executeInIoThread(Runnable task) {
// check if the current thread is the worker thread
//
// Also check if the event loop of the worker is complete. If so we need to run the task now.
// See #287
if (Thread.currentThread() == thread || done) {
task.run();
} else {
boolean added = eventQueue.offer(task);
if (added) {
// as we set the SO_TIMEOUT to 1 second this task will get picked up in 1 second at latest
}
}
}
private void processEventQueue() {
for (;;) {
final Runnable task = eventQueue.poll();
if (task == null) {
break;
}
task.run();
}
}
/**
* Process the incoming messages and also is responsible for call {@link Channels#fireMessageReceived(Channel, Object)} once a message
* was processed without errors.
*
* @return continue returns <code>true</code> as long as this worker should continue to try processing incoming messages
* @throws IOException
*/
abstract boolean process() throws IOException;
static void setInterestOps(
AbstractOioChannel channel, ChannelFuture future, int interestOps) {
boolean iothread = isIoThread(channel);
// Override OP_WRITE flag - a user cannot change this flag.
interestOps &= ~Channel.OP_WRITE;
interestOps |= channel.getInterestOps() & Channel.OP_WRITE;
boolean changed = false;
try {
if (channel.getInterestOps() != interestOps) {
if ((interestOps & Channel.OP_READ) != 0) {
channel.setInterestOpsNow(Channel.OP_READ);
} else {
channel.setInterestOpsNow(Channel.OP_NONE);
}
changed = true;
}
future.setSuccess();
if (changed) {
synchronized (channel.interestOpsLock) {
channel.setInterestOpsNow(interestOps);
// Notify the worker so it stops or continues reading.
Thread currentThread = Thread.currentThread();
Thread workerThread = channel.workerThread;
if (workerThread != null && currentThread != workerThread) {
workerThread.interrupt();
}
}
if (iothread) {
fireChannelInterestChanged(channel);
} else {
fireChannelInterestChangedLater(channel);
}
}
} catch (Throwable t) {
future.setFailure(t);
if (iothread) {
fireExceptionCaught(channel, t);
} else {
fireExceptionCaughtLater(channel, t);
}
}
}
static void close(AbstractOioChannel channel, ChannelFuture future) {
close(channel, future, isIoThread(channel));
}
private static void close(AbstractOioChannel channel, ChannelFuture future, boolean iothread) {
boolean connected = channel.isConnected();
boolean bound = channel.isBound();
try {
channel.closeSocket();
if (channel.setClosed()) {
future.setSuccess();
if (connected) {
// Notify the worker so it stops reading.
Thread currentThread = Thread.currentThread();
Thread workerThread = channel.workerThread;
if (workerThread != null && currentThread != workerThread) {
workerThread.interrupt();
}
if (iothread) {
fireChannelDisconnected(channel);
} else {
fireChannelDisconnectedLater(channel);
}
}
if (bound) {
if (iothread) {
fireChannelUnbound(channel);
} else {
fireChannelUnboundLater(channel);
}
}
if (iothread) {
fireChannelClosed(channel);
} else {
fireChannelClosedLater(channel);
}
} else {
future.setSuccess();
}
} catch (Throwable t) {
future.setFailure(t);
if (iothread) {
fireExceptionCaught(channel, t);
} else {
fireExceptionCaughtLater(channel, t);
}
}
}
}

View File

@ -1,83 +1,83 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.handler.execution;
import java.util.concurrent.Executor;
import org.jboss.netty.util.ExternalResourceReleasable;
import org.jboss.netty.util.internal.ExecutorUtil;
/**
* A special {@link Executor} which allows to chain a series of
* {@link Executor}s and {@link ChannelEventRunnableFilter}.
*/
public class ChainedExecutor implements Executor, ExternalResourceReleasable {
private final Executor cur;
private final Executor next;
private final ChannelEventRunnableFilter filter;
/**
* Create a new {@link ChainedExecutor} which will used the given {@link ChannelEventRunnableFilter} to see if the {@link #cur} {@link Executor} should get used.
* Otherwise it will pass the work to the {@link #next} {@link Executor}
*
* @param filter the {@link ChannelEventRunnableFilter} which will be used to check if the {@link ChannelEventRunnable} should be passed to the cur or next {@link Executor}
* @param cur the {@link Executor} to use if the {@link ChannelEventRunnableFilter} match
* @param next the {@link Executor} to use if the {@link ChannelEventRunnableFilter} does not match
*/
public ChainedExecutor(ChannelEventRunnableFilter filter, Executor cur, Executor next) {
if (filter == null) {
throw new NullPointerException("filter");
}
if (cur == null) {
throw new NullPointerException("cur");
}
if (next == null) {
throw new NullPointerException("next");
}
this.filter = filter;
this.cur = cur;
this.next = next;
}
/**
* Execute the passed {@link ChannelEventRunnable} with the current {@link Executor} if the {@link ChannelEventRunnableFilter} match.
* Otherwise pass it to the next {@link Executor} in the chain.
*/
public void execute(Runnable command) {
assert command instanceof ChannelEventRunnable;
if (filter.filter((ChannelEventRunnable) command)) {
cur.execute(command);
} else {
next.execute(command);
}
}
public void releaseExternalResources() {
ExecutorUtil.terminate(cur, next);
releaseExternal(cur);
releaseExternal(next);
}
private static void releaseExternal(Executor executor) {
if (executor instanceof ExternalResourceReleasable) {
((ExternalResourceReleasable) executor).releaseExternalResources();
}
}
}
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.handler.execution;
import java.util.concurrent.Executor;
import org.jboss.netty.util.ExternalResourceReleasable;
import org.jboss.netty.util.internal.ExecutorUtil;
/**
* A special {@link Executor} which allows to chain a series of
* {@link Executor}s and {@link ChannelEventRunnableFilter}.
*/
public class ChainedExecutor implements Executor, ExternalResourceReleasable {
private final Executor cur;
private final Executor next;
private final ChannelEventRunnableFilter filter;
/**
* Create a new {@link ChainedExecutor} which will used the given {@link ChannelEventRunnableFilter} to see if the {@link #cur} {@link Executor} should get used.
* Otherwise it will pass the work to the {@link #next} {@link Executor}
*
* @param filter the {@link ChannelEventRunnableFilter} which will be used to check if the {@link ChannelEventRunnable} should be passed to the cur or next {@link Executor}
* @param cur the {@link Executor} to use if the {@link ChannelEventRunnableFilter} match
* @param next the {@link Executor} to use if the {@link ChannelEventRunnableFilter} does not match
*/
public ChainedExecutor(ChannelEventRunnableFilter filter, Executor cur, Executor next) {
if (filter == null) {
throw new NullPointerException("filter");
}
if (cur == null) {
throw new NullPointerException("cur");
}
if (next == null) {
throw new NullPointerException("next");
}
this.filter = filter;
this.cur = cur;
this.next = next;
}
/**
* Execute the passed {@link ChannelEventRunnable} with the current {@link Executor} if the {@link ChannelEventRunnableFilter} match.
* Otherwise pass it to the next {@link Executor} in the chain.
*/
public void execute(Runnable command) {
assert command instanceof ChannelEventRunnable;
if (filter.filter((ChannelEventRunnable) command)) {
cur.execute(command);
} else {
next.execute(command);
}
}
public void releaseExternalResources() {
ExecutorUtil.terminate(cur, next);
releaseExternal(cur);
releaseExternal(next);
}
private static void releaseExternal(Executor executor) {
if (executor instanceof ExternalResourceReleasable) {
((ExternalResourceReleasable) executor).releaseExternalResources();
}
}
}

View File

@ -35,6 +35,6 @@ public class ChannelDownstreamEventRunnable extends ChannelEventRunnable {
*/
@Override
protected void doRun() {
ctx.sendDownstream(e);
ctx.sendDownstream(e);
}
}

View File

@ -1,27 +1,27 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.handler.execution;
/**
* {@link ChannelEventRunnableFilter} implementation which matches {@link ChannelDownstreamEventRunnable}
*
*/
public class ChannelDownstreamEventRunnableFilter implements ChannelEventRunnableFilter {
public boolean filter(ChannelEventRunnable event) {
return event instanceof ChannelDownstreamEventRunnable;
}
}
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.handler.execution;
/**
* {@link ChannelEventRunnableFilter} implementation which matches {@link ChannelDownstreamEventRunnable}
*
*/
public class ChannelDownstreamEventRunnableFilter implements ChannelEventRunnableFilter {
public boolean filter(ChannelEventRunnable event) {
return event instanceof ChannelDownstreamEventRunnable;
}
}

View File

@ -1,72 +1,72 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.handler.execution;
import java.util.concurrent.Executor;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.util.EstimatableObjectWrapper;
import org.jboss.netty.util.internal.DeadLockProofWorker;
public abstract class ChannelEventRunnable implements Runnable, EstimatableObjectWrapper {
protected final ChannelHandlerContext ctx;
protected final ChannelEvent e;
int estimatedSize;
private Executor executor;
/**
* Creates a {@link Runnable} which sends the specified {@link ChannelEvent}
* upstream via the specified {@link ChannelHandlerContext}.
*/
public ChannelEventRunnable(ChannelHandlerContext ctx, ChannelEvent e, Executor executor) {
this.ctx = ctx;
this.e = e;
this.executor = executor;
}
/**
* Returns the {@link ChannelHandlerContext} which will be used to
* send the {@link ChannelEvent} upstream.
*/
public ChannelHandlerContext getContext() {
return ctx;
}
/**
* Returns the {@link ChannelEvent} which will be sent upstream.
*/
public ChannelEvent getEvent() {
return e;
}
public Object unwrap() {
return e;
}
public final void run() {
try {
DeadLockProofWorker.PARENT.set(executor);
doRun();
} finally {
DeadLockProofWorker.PARENT.remove();
}
}
protected abstract void doRun();
}
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.handler.execution;
import java.util.concurrent.Executor;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.util.EstimatableObjectWrapper;
import org.jboss.netty.util.internal.DeadLockProofWorker;
public abstract class ChannelEventRunnable implements Runnable, EstimatableObjectWrapper {
protected final ChannelHandlerContext ctx;
protected final ChannelEvent e;
int estimatedSize;
private final Executor executor;
/**
* Creates a {@link Runnable} which sends the specified {@link ChannelEvent}
* upstream via the specified {@link ChannelHandlerContext}.
*/
public ChannelEventRunnable(ChannelHandlerContext ctx, ChannelEvent e, Executor executor) {
this.ctx = ctx;
this.e = e;
this.executor = executor;
}
/**
* Returns the {@link ChannelHandlerContext} which will be used to
* send the {@link ChannelEvent} upstream.
*/
public ChannelHandlerContext getContext() {
return ctx;
}
/**
* Returns the {@link ChannelEvent} which will be sent upstream.
*/
public ChannelEvent getEvent() {
return e;
}
public Object unwrap() {
return e;
}
public final void run() {
try {
DeadLockProofWorker.PARENT.set(executor);
doRun();
} finally {
DeadLockProofWorker.PARENT.remove();
}
}
protected abstract void doRun();
}

View File

@ -1,27 +1,27 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.handler.execution;
import java.util.concurrent.Executor;
public interface ChannelEventRunnableFilter {
/**
* Return <code>true</code> if the {@link ChannelEventRunnable} should get handled by the {@link Executor}
*
*/
boolean filter(ChannelEventRunnable event);
}
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.handler.execution;
import java.util.concurrent.Executor;
public interface ChannelEventRunnableFilter {
/**
* Return <code>true</code> if the {@link ChannelEventRunnable} should get handled by the {@link Executor}
*
*/
boolean filter(ChannelEventRunnable event);
}

View File

@ -1,25 +1,25 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.handler.execution;
/**
* {@link ChannelEventRunnableFilter} which matches {@link ChannelDownstreamEventRunnable}
*/
public class ChannelUpstreamEventRunnableFilter implements ChannelEventRunnableFilter {
public boolean filter(ChannelEventRunnable event) {
return event instanceof ChannelDownstreamEventRunnable;
}
}
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.handler.execution;
/**
* {@link ChannelEventRunnableFilter} which matches {@link ChannelDownstreamEventRunnable}
*/
public class ChannelUpstreamEventRunnableFilter implements ChannelEventRunnableFilter {
public boolean filter(ChannelEventRunnable event) {
return event instanceof ChannelDownstreamEventRunnable;
}
}

View File

@ -1,117 +1,117 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.util.internal;
import java.security.AccessController;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.Deflater;
/**
* Utility that detects various properties specific to the current runtime
* environment, such as Java version and the availability of the
* {@code sun.misc.Unsafe} object.
*
* <br>
* You can disable the use of {@code sun.misc.Unsafe} if you specify
* the System property <strong>org.jboss.netty.tryUnsafe</strong> with
* value of <code>false</code>. Default is <code>true</code>.
*/
public final class DetectionUtil {
private static final int JAVA_VERSION = javaVersion0();
private static final boolean HAS_UNSAFE = hasUnsafe(AtomicInteger.class.getClassLoader());
private static final boolean IS_WINDOWS;
static {
String os = System.getProperty("os.name").toLowerCase();
// windows
IS_WINDOWS = os.indexOf("win") >= 0;
}
/**
* Return <code>true</code> if the JVM is running on Windows
*
*/
public static boolean isWindows() {
return IS_WINDOWS;
}
public static boolean hasUnsafe() {
return HAS_UNSAFE;
}
public static int javaVersion() {
return JAVA_VERSION;
}
private static boolean hasUnsafe(ClassLoader loader) {
boolean useUnsafe = Boolean.valueOf(SystemPropertyUtil.get("org.jboss.netty.tryUnsafe", "true"));
if (!useUnsafe) {
return false;
}
try {
Class<?> unsafeClazz = Class.forName("sun.misc.Unsafe", true, loader);
return hasUnsafeField(unsafeClazz);
} catch (Exception e) {
// Ignore
}
return false;
}
private static boolean hasUnsafeField(final Class<?> unsafeClass) throws PrivilegedActionException {
return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {
public Boolean run() throws Exception {
unsafeClass.getDeclaredField("theUnsafe");
return true;
}
});
}
private static int javaVersion0() {
try {
// Check if its android, if so handle it the same way as java6.
//
// See https://github.com/netty/netty/issues/282
Class.forName("android.app.Application");
return 6;
} catch (ClassNotFoundException e) {
//Ignore
}
try {
Deflater.class.getDeclaredField("SYNC_FLUSH");
return 7;
} catch (Exception e) {
// Ignore
}
try {
Double.class.getDeclaredField("MIN_NORMAL");
return 6;
} catch (Exception e) {
// Ignore
}
return 5;
}
private DetectionUtil() {
// only static method supported
}
}
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.util.internal;
import java.security.AccessController;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.Deflater;
/**
* Utility that detects various properties specific to the current runtime
* environment, such as Java version and the availability of the
* {@code sun.misc.Unsafe} object.
*
* <br>
* You can disable the use of {@code sun.misc.Unsafe} if you specify
* the System property <strong>org.jboss.netty.tryUnsafe</strong> with
* value of <code>false</code>. Default is <code>true</code>.
*/
public final class DetectionUtil {
private static final int JAVA_VERSION = javaVersion0();
private static final boolean HAS_UNSAFE = hasUnsafe(AtomicInteger.class.getClassLoader());
private static final boolean IS_WINDOWS;
static {
String os = System.getProperty("os.name").toLowerCase();
// windows
IS_WINDOWS = os.indexOf("win") >= 0;
}
/**
* Return <code>true</code> if the JVM is running on Windows
*
*/
public static boolean isWindows() {
return IS_WINDOWS;
}
public static boolean hasUnsafe() {
return HAS_UNSAFE;
}
public static int javaVersion() {
return JAVA_VERSION;
}
private static boolean hasUnsafe(ClassLoader loader) {
boolean useUnsafe = Boolean.valueOf(SystemPropertyUtil.get("org.jboss.netty.tryUnsafe", "true"));
if (!useUnsafe) {
return false;
}
try {
Class<?> unsafeClazz = Class.forName("sun.misc.Unsafe", true, loader);
return hasUnsafeField(unsafeClazz);
} catch (Exception e) {
// Ignore
}
return false;
}
private static boolean hasUnsafeField(final Class<?> unsafeClass) throws PrivilegedActionException {
return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {
public Boolean run() throws Exception {
unsafeClass.getDeclaredField("theUnsafe");
return true;
}
});
}
private static int javaVersion0() {
try {
// Check if its android, if so handle it the same way as java6.
//
// See https://github.com/netty/netty/issues/282
Class.forName("android.app.Application");
return 6;
} catch (ClassNotFoundException e) {
//Ignore
}
try {
Deflater.class.getDeclaredField("SYNC_FLUSH");
return 7;
} catch (Exception e) {
// Ignore
}
try {
Double.class.getDeclaredField("MIN_NORMAL");
return 6;
} catch (Exception e) {
// Ignore
}
return 5;
}
private DetectionUtil() {
// only static method supported
}
}

View File

@ -1,100 +1,100 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.util.internal;
import java.util.Collection;
import java.util.concurrent.BlockingQueue;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
/**
* This factory should be used to create the "optimal" {@link BlockingQueue}
* instance for the running JVM.
*/
public final class QueueFactory {
private static final boolean useUnsafe = DetectionUtil.hasUnsafe();
private static final InternalLogger LOGGER = InternalLoggerFactory.getInstance(QueueFactory.class);
private QueueFactory() {
// only use static methods!
}
/**
* Create a new unbound {@link BlockingQueue}
*
* @param itemClass the {@link Class} type which will be used as {@link BlockingQueue} items
* @return queue the {@link BlockingQueue} implementation
*/
public static <T> BlockingQueue<T> createQueue(Class<T> itemClass) {
// if we run in java >=7 its the best to just use the LinkedTransferQueue which
// comes with java bundled. See #273
if (DetectionUtil.javaVersion() >= 7) {
return new java.util.concurrent.LinkedTransferQueue<T>();
}
try {
if (useUnsafe) {
return new LinkedTransferQueue<T>();
}
} catch (Throwable t) {
// For whatever reason an exception was thrown while loading the LinkedTransferQueue
//
// This mostly happens because of a custom classloader or security policy that did not allow us to access the
// com.sun.Unmisc class. So just log it and fallback to the old LegacyLinkedTransferQueue that works in all cases
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Unable to instance LinkedTransferQueue, fallback to LegacyLinkedTransferQueue", t);
}
}
return new LegacyLinkedTransferQueue<T>();
}
/**
* Create a new unbound {@link BlockingQueue}
*
* @param collection the collection which should get copied to the newly created {@link BlockingQueue}
* @param itemClass the {@link Class} type which will be used as {@link BlockingQueue} items
* @return queue the {@link BlockingQueue} implementation
*/
public static <T> BlockingQueue<T> createQueue(Collection<? extends T> collection, Class<T> itemClass) {
// if we run in java >=7 its the best to just use the LinkedTransferQueue which
// comes with java bundled. See #273
if (DetectionUtil.javaVersion() >= 7) {
return new java.util.concurrent.LinkedTransferQueue<T>();
}
try {
if (useUnsafe) {
return new LinkedTransferQueue<T>(collection);
}
} catch (Throwable t) {
// For whatever reason an exception was thrown while loading the LinkedTransferQueue
//
// This mostly happens because of a custom classloader or security policy that did not allow us to access the
// com.sun.Unmisc class. So just log it and fallback to the old LegacyLinkedTransferQueue that works in all cases
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Unable to instance LinkedTransferQueue, fallback to LegacyLinkedTransferQueue", t);
}
}
return new LegacyLinkedTransferQueue<T>(collection);
}
}
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.util.internal;
import java.util.Collection;
import java.util.concurrent.BlockingQueue;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
/**
* This factory should be used to create the "optimal" {@link BlockingQueue}
* instance for the running JVM.
*/
public final class QueueFactory {
private static final boolean useUnsafe = DetectionUtil.hasUnsafe();
private static final InternalLogger LOGGER = InternalLoggerFactory.getInstance(QueueFactory.class);
private QueueFactory() {
// only use static methods!
}
/**
* Create a new unbound {@link BlockingQueue}
*
* @param itemClass the {@link Class} type which will be used as {@link BlockingQueue} items
* @return queue the {@link BlockingQueue} implementation
*/
public static <T> BlockingQueue<T> createQueue(Class<T> itemClass) {
// if we run in java >=7 its the best to just use the LinkedTransferQueue which
// comes with java bundled. See #273
if (DetectionUtil.javaVersion() >= 7) {
return new java.util.concurrent.LinkedTransferQueue<T>();
}
try {
if (useUnsafe) {
return new LinkedTransferQueue<T>();
}
} catch (Throwable t) {
// For whatever reason an exception was thrown while loading the LinkedTransferQueue
//
// This mostly happens because of a custom classloader or security policy that did not allow us to access the
// com.sun.Unmisc class. So just log it and fallback to the old LegacyLinkedTransferQueue that works in all cases
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Unable to instance LinkedTransferQueue, fallback to LegacyLinkedTransferQueue", t);
}
}
return new LegacyLinkedTransferQueue<T>();
}
/**
* Create a new unbound {@link BlockingQueue}
*
* @param collection the collection which should get copied to the newly created {@link BlockingQueue}
* @param itemClass the {@link Class} type which will be used as {@link BlockingQueue} items
* @return queue the {@link BlockingQueue} implementation
*/
public static <T> BlockingQueue<T> createQueue(Collection<? extends T> collection, Class<T> itemClass) {
// if we run in java >=7 its the best to just use the LinkedTransferQueue which
// comes with java bundled. See #273
if (DetectionUtil.javaVersion() >= 7) {
return new java.util.concurrent.LinkedTransferQueue<T>();
}
try {
if (useUnsafe) {
return new LinkedTransferQueue<T>(collection);
}
} catch (Throwable t) {
// For whatever reason an exception was thrown while loading the LinkedTransferQueue
//
// This mostly happens because of a custom classloader or security policy that did not allow us to access the
// com.sun.Unmisc class. So just log it and fallback to the old LegacyLinkedTransferQueue that works in all cases
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Unable to instance LinkedTransferQueue, fallback to LegacyLinkedTransferQueue", t);
}
}
return new LegacyLinkedTransferQueue<T>(collection);
}
}