Optimize the handling of fireEventLater if the current thread is the
worker thread. See #187 and #140
This commit is contained in:
parent
5fdd2dea12
commit
c2bc463d61
|
@ -28,13 +28,18 @@ public abstract class AbstractScptChannelSink extends AbstractChannelSink{
|
||||||
Channel ch = e.getChannel();
|
Channel ch = e.getChannel();
|
||||||
if (ch instanceof SctpChannelImpl) {
|
if (ch instanceof SctpChannelImpl) {
|
||||||
SctpChannelImpl channel = (SctpChannelImpl) ch;
|
SctpChannelImpl channel = (SctpChannelImpl) ch;
|
||||||
channel.worker.fireEventLater(new Runnable() {
|
// check if the current thread is a worker thread, and only fire the event later if thats not the case
|
||||||
|
if (channel.worker.thread != Thread.currentThread()) {
|
||||||
|
channel.worker.fireEventLater(new Runnable() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
pipeline.sendUpstream(e);
|
pipeline.sendUpstream(e);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
} else {
|
||||||
|
pipeline.sendUpstream(e);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
throw new UnsupportedOperationException();
|
throw new UnsupportedOperationException();
|
||||||
}
|
}
|
||||||
|
|
|
@ -65,7 +65,7 @@ class SctpWorker implements Worker {
|
||||||
|
|
||||||
private final Executor executor;
|
private final Executor executor;
|
||||||
private boolean started;
|
private boolean started;
|
||||||
private volatile Thread thread;
|
volatile Thread thread;
|
||||||
volatile Selector selector;
|
volatile Selector selector;
|
||||||
private final AtomicBoolean wakenUp = new AtomicBoolean();
|
private final AtomicBoolean wakenUp = new AtomicBoolean();
|
||||||
private final ReadWriteLock selectorGuard = new ReentrantReadWriteLock();
|
private final ReadWriteLock selectorGuard = new ReentrantReadWriteLock();
|
||||||
|
|
|
@ -28,13 +28,18 @@ public abstract class AbstractNioChannelSink extends AbstractChannelSink{
|
||||||
Channel ch = e.getChannel();
|
Channel ch = e.getChannel();
|
||||||
if (ch instanceof AbstractNioChannel<?>) {
|
if (ch instanceof AbstractNioChannel<?>) {
|
||||||
AbstractNioChannel<?> channel = (AbstractNioChannel<?>) ch;
|
AbstractNioChannel<?> channel = (AbstractNioChannel<?>) ch;
|
||||||
channel.worker.fireEventLater(new Runnable() {
|
// check if the current thread is a worker thread if so we can send the event now
|
||||||
|
if (channel.worker.thread != Thread.currentThread()) {
|
||||||
|
channel.worker.fireEventLater(new Runnable() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
pipeline.sendUpstream(e);
|
pipeline.sendUpstream(e);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
} else {
|
||||||
|
pipeline.sendUpstream(e);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
throw new UnsupportedOperationException();
|
throw new UnsupportedOperationException();
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,7 +30,7 @@ public abstract class AbstractOioChannelSink extends AbstractChannelSink{
|
||||||
if (ch instanceof AbstractOioChannel) {
|
if (ch instanceof AbstractOioChannel) {
|
||||||
AbstractOioChannel channel = (AbstractOioChannel) ch;
|
AbstractOioChannel channel = (AbstractOioChannel) ch;
|
||||||
Worker worker = channel.worker;
|
Worker worker = channel.worker;
|
||||||
if (worker != null) {
|
if (worker != null && channel.workerThread != Thread.currentThread()) {
|
||||||
channel.worker.fireEventLater(new Runnable() {
|
channel.worker.fireEventLater(new Runnable() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -39,7 +39,7 @@ public abstract class AbstractOioChannelSink extends AbstractChannelSink{
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
// no worker thread yet so just fire the event now
|
// no worker thread yet or the current thread is a worker thread so just fire the event now
|
||||||
pipeline.sendUpstream(e);
|
pipeline.sendUpstream(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user