Change AbstractChannel#doRegister return type from Runnable to void.
This commit is contained in:
parent
ca59c1201e
commit
bf2430d255
@ -428,13 +428,10 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
|||||||
if (!ensureOpen(promise)) {
|
if (!ensureOpen(promise)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
Runnable postRegisterTask = doRegister();
|
doRegister();
|
||||||
registered = true;
|
registered = true;
|
||||||
promise.setSuccess();
|
promise.setSuccess();
|
||||||
pipeline.fireChannelRegistered();
|
pipeline.fireChannelRegistered();
|
||||||
if (postRegisterTask != null) {
|
|
||||||
postRegisterTask.run();
|
|
||||||
}
|
|
||||||
if (isActive()) {
|
if (isActive()) {
|
||||||
pipeline.fireChannelActive();
|
pipeline.fireChannelActive();
|
||||||
}
|
}
|
||||||
@ -731,12 +728,11 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Is called after the {@link Channel} is registered with its {@link EventLoop} as part of the register process.
|
* Is called after the {@link Channel} is registered with its {@link EventLoop} as part of the register process.
|
||||||
* You can return a {@link Runnable} which will be run as post-task of the registration process.
|
|
||||||
*
|
*
|
||||||
* Sub-classes may override this method as it will just return {@code null}
|
* Sub-classes may override this method
|
||||||
*/
|
*/
|
||||||
protected Runnable doRegister() throws Exception {
|
protected void doRegister() throws Exception {
|
||||||
return null;
|
// NOOP
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -271,9 +271,8 @@ public class EmbeddedChannel extends AbstractChannel {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Runnable doRegister() throws Exception {
|
protected void doRegister() throws Exception {
|
||||||
state = 1;
|
state = 1;
|
||||||
return null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -150,39 +150,26 @@ public class LocalChannel extends AbstractChannel {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Runnable doRegister() throws Exception {
|
protected void doRegister() throws Exception {
|
||||||
final LocalChannel peer = this.peer;
|
|
||||||
Runnable postRegisterTask;
|
|
||||||
|
|
||||||
if (peer != null) {
|
if (peer != null) {
|
||||||
state = 2;
|
state = 2;
|
||||||
|
|
||||||
peer.remoteAddress = parent().localAddress();
|
peer.remoteAddress = parent().localAddress();
|
||||||
peer.state = 2;
|
peer.state = 2;
|
||||||
|
|
||||||
// Ensure the peer's channelActive event is triggered *after* this channel's
|
// Always call peer.eventLoop().execute() even if peer.eventLoop().inEventLoop() is true.
|
||||||
// channelRegistered event is triggered, so that this channel's pipeline is fully
|
// This ensures that if both channels are on the same event loop, the peer's channelActive
|
||||||
// initialized by ChannelInitializer.
|
// event is triggered *after* this channel's channelRegistered event, so that this channel's
|
||||||
final EventLoop peerEventLoop = peer.eventLoop();
|
// pipeline is fully initialized by ChannelInitializer before any channelRead events.
|
||||||
postRegisterTask = new Runnable() {
|
peer.eventLoop().execute(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
peerEventLoop.execute(new Runnable() {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
peer.connectPromise.setSuccess();
|
|
||||||
peer.pipeline().fireChannelActive();
|
peer.pipeline().fireChannelActive();
|
||||||
|
peer.connectPromise.setSuccess();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
};
|
|
||||||
} else {
|
|
||||||
postRegisterTask = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
((SingleThreadEventExecutor) eventLoop()).addShutdownHook(shutdownHook);
|
((SingleThreadEventExecutor) eventLoop()).addShutdownHook(shutdownHook);
|
||||||
|
|
||||||
return postRegisterTask;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -82,9 +82,8 @@ public class LocalServerChannel extends AbstractServerChannel {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Runnable doRegister() throws Exception {
|
protected void doRegister() throws Exception {
|
||||||
((SingleThreadEventExecutor) eventLoop()).addShutdownHook(shutdownHook);
|
((SingleThreadEventExecutor) eventLoop()).addShutdownHook(shutdownHook);
|
||||||
return null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -114,13 +113,13 @@ public class LocalServerChannel extends AbstractServerChannel {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
ChannelPipeline pipeline = pipeline();
|
|
||||||
Queue<Object> inboundBuffer = this.inboundBuffer;
|
Queue<Object> inboundBuffer = this.inboundBuffer;
|
||||||
if (inboundBuffer.isEmpty()) {
|
if (inboundBuffer.isEmpty()) {
|
||||||
acceptInProgress = true;
|
acceptInProgress = true;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ChannelPipeline pipeline = pipeline();
|
||||||
for (;;) {
|
for (;;) {
|
||||||
Object m = inboundBuffer.poll();
|
Object m = inboundBuffer.poll();
|
||||||
if (m == null) {
|
if (m == null) {
|
||||||
@ -132,17 +131,25 @@ public class LocalServerChannel extends AbstractServerChannel {
|
|||||||
}
|
}
|
||||||
|
|
||||||
LocalChannel serve(final LocalChannel peer) {
|
LocalChannel serve(final LocalChannel peer) {
|
||||||
LocalChannel child = new LocalChannel(this, peer);
|
final LocalChannel child = new LocalChannel(this, peer);
|
||||||
|
if (eventLoop().inEventLoop()) {
|
||||||
serve0(child);
|
serve0(child);
|
||||||
|
} else {
|
||||||
|
eventLoop().execute(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
serve0(child);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
return child;
|
return child;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void serve0(final LocalChannel child) {
|
private void serve0(final LocalChannel child) {
|
||||||
if (eventLoop().inEventLoop()) {
|
|
||||||
final ChannelPipeline pipeline = pipeline();
|
|
||||||
inboundBuffer.add(child);
|
inboundBuffer.add(child);
|
||||||
if (acceptInProgress) {
|
if (acceptInProgress) {
|
||||||
acceptInProgress = false;
|
acceptInProgress = false;
|
||||||
|
ChannelPipeline pipeline = pipeline();
|
||||||
for (;;) {
|
for (;;) {
|
||||||
Object m = inboundBuffer.poll();
|
Object m = inboundBuffer.poll();
|
||||||
if (m == null) {
|
if (m == null) {
|
||||||
@ -152,13 +159,5 @@ public class LocalServerChannel extends AbstractServerChannel {
|
|||||||
}
|
}
|
||||||
pipeline.fireChannelReadComplete();
|
pipeline.fireChannelReadComplete();
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
eventLoop().execute(new Runnable() {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
serve0(child);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -277,12 +277,12 @@ public abstract class AbstractNioChannel extends AbstractChannel {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Runnable doRegister() throws Exception {
|
protected void doRegister() throws Exception {
|
||||||
boolean selected = false;
|
boolean selected = false;
|
||||||
for (;;) {
|
for (;;) {
|
||||||
try {
|
try {
|
||||||
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
|
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
|
||||||
return null;
|
return;
|
||||||
} catch (CancelledKeyException e) {
|
} catch (CancelledKeyException e) {
|
||||||
if (!selected) {
|
if (!selected) {
|
||||||
// Force the Selector to select now as the "canceled" SelectionKey may still be
|
// Force the Selector to select now as the "canceled" SelectionKey may still be
|
||||||
|
Loading…
x
Reference in New Issue
Block a user