f4d3f81d6c
Motivation: AbstractUnsafe considers two possibilities during channel registration. First, the channel may be an outgoing connection, in which case it will be registered before becoming active. Second, the channel may be an incoming connection in, which case the channel will already be active when it is registered. To handle the second case, AbstractUnsafe checks if the channel is active after registration and calls ChannelPipeline.fireChannelActive() if so. However, if an active channel is deregistered and then re-registered this logic causes a second fireChannelActive() to be invoked. This is unexpected; it is reasonable for handlers to assume that this method will only be invoked once per channel. Modifications: This change introduces a flag into AbstractUnsafe to recognize if this is the first or a subsequent registration. ChannelPipeline.fireChannelActive() is only possible for the first registration. Result: ChannelPipeline.fireChannelActive() is only called once.
942 lines
30 KiB
Java
942 lines
30 KiB
Java
/*
|
|
* Copyright 2012 The Netty Project
|
|
*
|
|
* The Netty Project licenses this file to you under the Apache License,
|
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
|
* with the License. You may obtain a copy of the License at:
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
* License for the specific language governing permissions and limitations
|
|
* under the License.
|
|
*/
|
|
package io.netty.channel;
|
|
|
|
import io.netty.buffer.ByteBufAllocator;
|
|
import io.netty.util.DefaultAttributeMap;
|
|
import io.netty.util.ReferenceCountUtil;
|
|
import io.netty.util.internal.EmptyArrays;
|
|
import io.netty.util.internal.OneTimeTask;
|
|
import io.netty.util.internal.PlatformDependent;
|
|
import io.netty.util.internal.logging.InternalLogger;
|
|
import io.netty.util.internal.logging.InternalLoggerFactory;
|
|
|
|
import java.io.IOException;
|
|
import java.net.ConnectException;
|
|
import java.net.InetSocketAddress;
|
|
import java.net.NoRouteToHostException;
|
|
import java.net.SocketAddress;
|
|
import java.net.SocketException;
|
|
import java.nio.channels.ClosedChannelException;
|
|
import java.nio.channels.NotYetConnectedException;
|
|
import java.util.concurrent.RejectedExecutionException;
|
|
|
|
/**
|
|
* A skeletal {@link Channel} implementation.
|
|
*/
|
|
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
|
|
|
|
private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannel.class);
|
|
|
|
static final ClosedChannelException CLOSED_CHANNEL_EXCEPTION = new ClosedChannelException();
|
|
static final NotYetConnectedException NOT_YET_CONNECTED_EXCEPTION = new NotYetConnectedException();
|
|
|
|
static {
|
|
CLOSED_CHANNEL_EXCEPTION.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
|
|
NOT_YET_CONNECTED_EXCEPTION.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
|
|
}
|
|
|
|
private MessageSizeEstimator.Handle estimatorHandle;
|
|
|
|
private final Channel parent;
|
|
private final ChannelId id;
|
|
private final Unsafe unsafe;
|
|
private final DefaultChannelPipeline pipeline;
|
|
private final ChannelFuture succeededFuture = new SucceededChannelFuture(this, null);
|
|
private final VoidChannelPromise voidPromise = new VoidChannelPromise(this, true);
|
|
private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
|
|
private final CloseFuture closeFuture = new CloseFuture(this);
|
|
|
|
private volatile SocketAddress localAddress;
|
|
private volatile SocketAddress remoteAddress;
|
|
private volatile EventLoop eventLoop;
|
|
private volatile boolean registered;
|
|
|
|
/** Cache for the string representation of this channel */
|
|
private boolean strValActive;
|
|
private String strVal;
|
|
|
|
/**
|
|
* Creates a new instance.
|
|
*
|
|
* @param parent
|
|
* the parent of this channel. {@code null} if there's no parent.
|
|
*/
|
|
protected AbstractChannel(Channel parent) {
|
|
this.parent = parent;
|
|
id = DefaultChannelId.newInstance();
|
|
unsafe = newUnsafe();
|
|
pipeline = new DefaultChannelPipeline(this);
|
|
}
|
|
|
|
/**
|
|
* Creates a new instance.
|
|
*
|
|
* @param parent
|
|
* the parent of this channel. {@code null} if there's no parent.
|
|
*/
|
|
protected AbstractChannel(Channel parent, ChannelId id) {
|
|
this.parent = parent;
|
|
this.id = id;
|
|
unsafe = newUnsafe();
|
|
pipeline = new DefaultChannelPipeline(this);
|
|
}
|
|
|
|
@Override
|
|
public final ChannelId id() {
|
|
return id;
|
|
}
|
|
|
|
@Override
|
|
public boolean isWritable() {
|
|
ChannelOutboundBuffer buf = unsafe.outboundBuffer();
|
|
return buf != null && buf.isWritable();
|
|
}
|
|
|
|
@Override
|
|
public Channel parent() {
|
|
return parent;
|
|
}
|
|
|
|
@Override
|
|
public ChannelPipeline pipeline() {
|
|
return pipeline;
|
|
}
|
|
|
|
@Override
|
|
public ByteBufAllocator alloc() {
|
|
return config().getAllocator();
|
|
}
|
|
|
|
@Override
|
|
public EventLoop eventLoop() {
|
|
EventLoop eventLoop = this.eventLoop;
|
|
if (eventLoop == null) {
|
|
throw new IllegalStateException("channel not registered to an event loop");
|
|
}
|
|
return eventLoop;
|
|
}
|
|
|
|
@Override
|
|
public SocketAddress localAddress() {
|
|
SocketAddress localAddress = this.localAddress;
|
|
if (localAddress == null) {
|
|
try {
|
|
this.localAddress = localAddress = unsafe().localAddress();
|
|
} catch (Throwable t) {
|
|
// Sometimes fails on a closed socket in Windows.
|
|
return null;
|
|
}
|
|
}
|
|
return localAddress;
|
|
}
|
|
|
|
protected void invalidateLocalAddress() {
|
|
localAddress = null;
|
|
}
|
|
|
|
@Override
|
|
public SocketAddress remoteAddress() {
|
|
SocketAddress remoteAddress = this.remoteAddress;
|
|
if (remoteAddress == null) {
|
|
try {
|
|
this.remoteAddress = remoteAddress = unsafe().remoteAddress();
|
|
} catch (Throwable t) {
|
|
// Sometimes fails on a closed socket in Windows.
|
|
return null;
|
|
}
|
|
}
|
|
return remoteAddress;
|
|
}
|
|
|
|
/**
|
|
* Reset the stored remoteAddress
|
|
*/
|
|
protected void invalidateRemoteAddress() {
|
|
remoteAddress = null;
|
|
}
|
|
|
|
@Override
|
|
public boolean isRegistered() {
|
|
return registered;
|
|
}
|
|
|
|
@Override
|
|
public ChannelFuture bind(SocketAddress localAddress) {
|
|
return pipeline.bind(localAddress);
|
|
}
|
|
|
|
@Override
|
|
public ChannelFuture connect(SocketAddress remoteAddress) {
|
|
return pipeline.connect(remoteAddress);
|
|
}
|
|
|
|
@Override
|
|
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
|
|
return pipeline.connect(remoteAddress, localAddress);
|
|
}
|
|
|
|
@Override
|
|
public ChannelFuture disconnect() {
|
|
return pipeline.disconnect();
|
|
}
|
|
|
|
@Override
|
|
public ChannelFuture close() {
|
|
return pipeline.close();
|
|
}
|
|
|
|
@Override
|
|
public ChannelFuture deregister() {
|
|
return pipeline.deregister();
|
|
}
|
|
|
|
@Override
|
|
public Channel flush() {
|
|
pipeline.flush();
|
|
return this;
|
|
}
|
|
|
|
@Override
|
|
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
|
|
return pipeline.bind(localAddress, promise);
|
|
}
|
|
|
|
@Override
|
|
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
|
|
return pipeline.connect(remoteAddress, promise);
|
|
}
|
|
|
|
@Override
|
|
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
|
|
return pipeline.connect(remoteAddress, localAddress, promise);
|
|
}
|
|
|
|
@Override
|
|
public ChannelFuture disconnect(ChannelPromise promise) {
|
|
return pipeline.disconnect(promise);
|
|
}
|
|
|
|
@Override
|
|
public ChannelFuture close(ChannelPromise promise) {
|
|
return pipeline.close(promise);
|
|
}
|
|
|
|
@Override
|
|
public ChannelFuture deregister(ChannelPromise promise) {
|
|
return pipeline.deregister(promise);
|
|
}
|
|
|
|
@Override
|
|
public Channel read() {
|
|
pipeline.read();
|
|
return this;
|
|
}
|
|
|
|
@Override
|
|
public ChannelFuture write(Object msg) {
|
|
return pipeline.write(msg);
|
|
}
|
|
|
|
@Override
|
|
public ChannelFuture write(Object msg, ChannelPromise promise) {
|
|
return pipeline.write(msg, promise);
|
|
}
|
|
|
|
@Override
|
|
public ChannelFuture writeAndFlush(Object msg) {
|
|
return pipeline.writeAndFlush(msg);
|
|
}
|
|
|
|
@Override
|
|
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
|
|
return pipeline.writeAndFlush(msg, promise);
|
|
}
|
|
|
|
@Override
|
|
public ChannelPromise newPromise() {
|
|
return new DefaultChannelPromise(this);
|
|
}
|
|
|
|
@Override
|
|
public ChannelProgressivePromise newProgressivePromise() {
|
|
return new DefaultChannelProgressivePromise(this);
|
|
}
|
|
|
|
@Override
|
|
public ChannelFuture newSucceededFuture() {
|
|
return succeededFuture;
|
|
}
|
|
|
|
@Override
|
|
public ChannelFuture newFailedFuture(Throwable cause) {
|
|
return new FailedChannelFuture(this, null, cause);
|
|
}
|
|
|
|
@Override
|
|
public ChannelFuture closeFuture() {
|
|
return closeFuture;
|
|
}
|
|
|
|
@Override
|
|
public Unsafe unsafe() {
|
|
return unsafe;
|
|
}
|
|
|
|
/**
|
|
* Create a new {@link AbstractUnsafe} instance which will be used for the life-time of the {@link Channel}
|
|
*/
|
|
protected abstract AbstractUnsafe newUnsafe();
|
|
|
|
/**
|
|
* Returns the ID of this channel.
|
|
*/
|
|
@Override
|
|
public final int hashCode() {
|
|
return id.hashCode();
|
|
}
|
|
|
|
/**
|
|
* Returns {@code true} if and only if the specified object is identical
|
|
* with this channel (i.e: {@code this == o}).
|
|
*/
|
|
@Override
|
|
public final boolean equals(Object o) {
|
|
return this == o;
|
|
}
|
|
|
|
@Override
|
|
public final int compareTo(Channel o) {
|
|
if (this == o) {
|
|
return 0;
|
|
}
|
|
|
|
return id().compareTo(o.id());
|
|
}
|
|
|
|
/**
|
|
* Returns the {@link String} representation of this channel. The returned
|
|
* string contains the {@linkplain #hashCode()} ID}, {@linkplain #localAddress() local address},
|
|
* and {@linkplain #remoteAddress() remote address} of this channel for
|
|
* easier identification.
|
|
*/
|
|
@Override
|
|
public String toString() {
|
|
boolean active = isActive();
|
|
if (strValActive == active && strVal != null) {
|
|
return strVal;
|
|
}
|
|
|
|
SocketAddress remoteAddr = remoteAddress();
|
|
SocketAddress localAddr = localAddress();
|
|
if (remoteAddr != null) {
|
|
SocketAddress srcAddr;
|
|
SocketAddress dstAddr;
|
|
if (parent == null) {
|
|
srcAddr = localAddr;
|
|
dstAddr = remoteAddr;
|
|
} else {
|
|
srcAddr = remoteAddr;
|
|
dstAddr = localAddr;
|
|
}
|
|
|
|
StringBuilder buf = new StringBuilder(96)
|
|
.append("[id: 0x")
|
|
.append(id.asShortText())
|
|
.append(", ")
|
|
.append(srcAddr)
|
|
.append(active? " => " : " :> ")
|
|
.append(dstAddr)
|
|
.append(']');
|
|
strVal = buf.toString();
|
|
} else if (localAddr != null) {
|
|
StringBuilder buf = new StringBuilder(64)
|
|
.append("[id: 0x")
|
|
.append(id.asShortText())
|
|
.append(", ")
|
|
.append(localAddr)
|
|
.append(']');
|
|
strVal = buf.toString();
|
|
} else {
|
|
StringBuilder buf = new StringBuilder(16)
|
|
.append("[id: 0x")
|
|
.append(id.asShortText())
|
|
.append(']');
|
|
strVal = buf.toString();
|
|
}
|
|
|
|
strValActive = active;
|
|
return strVal;
|
|
}
|
|
|
|
@Override
|
|
public final ChannelPromise voidPromise() {
|
|
return voidPromise;
|
|
}
|
|
|
|
final MessageSizeEstimator.Handle estimatorHandle() {
|
|
if (estimatorHandle == null) {
|
|
estimatorHandle = config().getMessageSizeEstimator().newHandle();
|
|
}
|
|
return estimatorHandle;
|
|
}
|
|
|
|
/**
|
|
* {@link Unsafe} implementation which sub-classes must extend and use.
|
|
*/
|
|
protected abstract class AbstractUnsafe implements Unsafe {
|
|
|
|
private ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
|
|
private RecvByteBufAllocator.Handle recvHandle;
|
|
private boolean inFlush0;
|
|
/** true if the channel has never been registered, false otherwise */
|
|
private boolean neverRegistered = true;
|
|
|
|
@Override
|
|
public RecvByteBufAllocator.Handle recvBufAllocHandle() {
|
|
if (recvHandle == null) {
|
|
recvHandle = config().getRecvByteBufAllocator().newHandle();
|
|
}
|
|
return recvHandle;
|
|
}
|
|
|
|
@Override
|
|
public final ChannelHandlerInvoker invoker() {
|
|
return eventLoop().asInvoker();
|
|
}
|
|
|
|
@Override
|
|
public final ChannelOutboundBuffer outboundBuffer() {
|
|
return outboundBuffer;
|
|
}
|
|
|
|
@Override
|
|
public final SocketAddress localAddress() {
|
|
return localAddress0();
|
|
}
|
|
|
|
@Override
|
|
public final SocketAddress remoteAddress() {
|
|
return remoteAddress0();
|
|
}
|
|
|
|
@Override
|
|
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
|
|
if (eventLoop == null) {
|
|
throw new NullPointerException("eventLoop");
|
|
}
|
|
if (isRegistered()) {
|
|
promise.setFailure(new IllegalStateException("registered to an event loop already"));
|
|
return;
|
|
}
|
|
if (!isCompatible(eventLoop)) {
|
|
promise.setFailure(
|
|
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
|
|
return;
|
|
}
|
|
|
|
AbstractChannel.this.eventLoop = eventLoop;
|
|
|
|
if (eventLoop.inEventLoop()) {
|
|
register0(promise);
|
|
} else {
|
|
try {
|
|
eventLoop.execute(new OneTimeTask() {
|
|
@Override
|
|
public void run() {
|
|
register0(promise);
|
|
}
|
|
});
|
|
} catch (Throwable t) {
|
|
logger.warn(
|
|
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
|
|
AbstractChannel.this, t);
|
|
closeForcibly();
|
|
closeFuture.setClosed();
|
|
safeSetFailure(promise, t);
|
|
}
|
|
}
|
|
}
|
|
|
|
private void register0(ChannelPromise promise) {
|
|
try {
|
|
// check if the channel is still open as it could be closed in the mean time when the register
|
|
// call was outside of the eventLoop
|
|
if (!promise.setUncancellable() || !ensureOpen(promise)) {
|
|
return;
|
|
}
|
|
boolean firstRegistration = neverRegistered;
|
|
doRegister();
|
|
neverRegistered = false;
|
|
registered = true;
|
|
safeSetSuccess(promise);
|
|
pipeline.fireChannelRegistered();
|
|
// Only fire a channelActive if the channel has never been registered. This prevents firing
|
|
// multiple channel actives if the channel is deregistered and re-registered.
|
|
if (firstRegistration && isActive()) {
|
|
pipeline.fireChannelActive();
|
|
}
|
|
} catch (Throwable t) {
|
|
// Close the channel directly to avoid FD leak.
|
|
closeForcibly();
|
|
closeFuture.setClosed();
|
|
safeSetFailure(promise, t);
|
|
}
|
|
}
|
|
|
|
@Override
|
|
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
|
|
if (!promise.setUncancellable() || !ensureOpen(promise)) {
|
|
return;
|
|
}
|
|
|
|
// See: https://github.com/netty/netty/issues/576
|
|
if (!PlatformDependent.isWindows() && !PlatformDependent.isRoot() &&
|
|
Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
|
|
localAddress instanceof InetSocketAddress &&
|
|
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress()) {
|
|
// Warn a user about the fact that a non-root user can't receive a
|
|
// broadcast packet on *nix if the socket is bound on non-wildcard address.
|
|
logger.warn(
|
|
"A non-root user can't receive a broadcast packet if the socket " +
|
|
"is not bound to a wildcard address; binding to a non-wildcard " +
|
|
"address (" + localAddress + ") anyway as requested.");
|
|
}
|
|
|
|
boolean wasActive = isActive();
|
|
try {
|
|
doBind(localAddress);
|
|
} catch (Throwable t) {
|
|
safeSetFailure(promise, t);
|
|
closeIfClosed();
|
|
return;
|
|
}
|
|
|
|
if (!wasActive && isActive()) {
|
|
invokeLater(new OneTimeTask() {
|
|
@Override
|
|
public void run() {
|
|
pipeline.fireChannelActive();
|
|
}
|
|
});
|
|
}
|
|
|
|
safeSetSuccess(promise);
|
|
}
|
|
|
|
@Override
|
|
public final void disconnect(final ChannelPromise promise) {
|
|
if (!promise.setUncancellable()) {
|
|
return;
|
|
}
|
|
|
|
boolean wasActive = isActive();
|
|
try {
|
|
doDisconnect();
|
|
} catch (Throwable t) {
|
|
safeSetFailure(promise, t);
|
|
closeIfClosed();
|
|
return;
|
|
}
|
|
|
|
if (wasActive && !isActive()) {
|
|
invokeLater(new OneTimeTask() {
|
|
@Override
|
|
public void run() {
|
|
pipeline.fireChannelInactive();
|
|
}
|
|
});
|
|
}
|
|
|
|
safeSetSuccess(promise);
|
|
closeIfClosed(); // doDisconnect() might have closed the channel
|
|
}
|
|
|
|
@Override
|
|
public final void close(final ChannelPromise promise) {
|
|
if (!promise.setUncancellable()) {
|
|
return;
|
|
}
|
|
|
|
if (inFlush0) {
|
|
invokeLater(new OneTimeTask() {
|
|
@Override
|
|
public void run() {
|
|
close(promise);
|
|
}
|
|
});
|
|
return;
|
|
}
|
|
|
|
if (closeFuture.isDone()) {
|
|
// Closed already.
|
|
safeSetSuccess(promise);
|
|
return;
|
|
}
|
|
|
|
boolean wasActive = isActive();
|
|
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
|
|
this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
|
|
|
|
try {
|
|
doClose();
|
|
closeFuture.setClosed();
|
|
safeSetSuccess(promise);
|
|
} catch (Throwable t) {
|
|
closeFuture.setClosed();
|
|
safeSetFailure(promise, t);
|
|
}
|
|
|
|
// Fail all the queued messages
|
|
try {
|
|
outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION);
|
|
outboundBuffer.close(CLOSED_CHANNEL_EXCEPTION);
|
|
} finally {
|
|
|
|
if (wasActive && !isActive()) {
|
|
invokeLater(new OneTimeTask() {
|
|
@Override
|
|
public void run() {
|
|
pipeline.fireChannelInactive();
|
|
}
|
|
});
|
|
}
|
|
|
|
deregister(voidPromise());
|
|
}
|
|
}
|
|
|
|
@Override
|
|
public final void closeForcibly() {
|
|
try {
|
|
doClose();
|
|
} catch (Exception e) {
|
|
logger.warn("Failed to close a channel.", e);
|
|
}
|
|
}
|
|
|
|
@Override
|
|
public final void deregister(final ChannelPromise promise) {
|
|
if (!promise.setUncancellable()) {
|
|
return;
|
|
}
|
|
|
|
if (!registered) {
|
|
safeSetSuccess(promise);
|
|
return;
|
|
}
|
|
|
|
try {
|
|
doDeregister();
|
|
} catch (Throwable t) {
|
|
logger.warn("Unexpected exception occurred while deregistering a channel.", t);
|
|
} finally {
|
|
if (registered) {
|
|
registered = false;
|
|
invokeLater(new OneTimeTask() {
|
|
@Override
|
|
public void run() {
|
|
pipeline.fireChannelUnregistered();
|
|
}
|
|
});
|
|
safeSetSuccess(promise);
|
|
} else {
|
|
// Some transports like local and AIO does not allow the deregistration of
|
|
// an open channel. Their doDeregister() calls close(). Consequently,
|
|
// close() calls deregister() again - no need to fire channelUnregistered.
|
|
safeSetSuccess(promise);
|
|
}
|
|
}
|
|
}
|
|
|
|
@Override
|
|
public final void beginRead() {
|
|
if (!isActive()) {
|
|
return;
|
|
}
|
|
|
|
try {
|
|
doBeginRead();
|
|
} catch (final Exception e) {
|
|
invokeLater(new OneTimeTask() {
|
|
@Override
|
|
public void run() {
|
|
pipeline.fireExceptionCaught(e);
|
|
}
|
|
});
|
|
close(voidPromise());
|
|
}
|
|
}
|
|
|
|
@Override
|
|
public final void write(Object msg, ChannelPromise promise) {
|
|
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
|
|
if (outboundBuffer == null) {
|
|
// If the outboundBuffer is null we know the channel was closed and so
|
|
// need to fail the future right away. If it is not null the handling of the rest
|
|
// will be done in flush0()
|
|
// See https://github.com/netty/netty/issues/2362
|
|
safeSetFailure(promise, CLOSED_CHANNEL_EXCEPTION);
|
|
// release message now to prevent resource-leak
|
|
ReferenceCountUtil.release(msg);
|
|
return;
|
|
}
|
|
|
|
int size;
|
|
try {
|
|
msg = filterOutboundMessage(msg);
|
|
size = estimatorHandle().size(msg);
|
|
if (size < 0) {
|
|
size = 0;
|
|
}
|
|
} catch (Throwable t) {
|
|
safeSetFailure(promise, t);
|
|
ReferenceCountUtil.release(msg);
|
|
return;
|
|
}
|
|
|
|
outboundBuffer.addMessage(msg, size, promise);
|
|
}
|
|
|
|
@Override
|
|
public final void flush() {
|
|
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
|
|
if (outboundBuffer == null) {
|
|
return;
|
|
}
|
|
|
|
outboundBuffer.addFlush();
|
|
flush0();
|
|
}
|
|
|
|
@SuppressWarnings("deprecation")
|
|
protected void flush0() {
|
|
if (inFlush0) {
|
|
// Avoid re-entrance
|
|
return;
|
|
}
|
|
|
|
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
|
|
if (outboundBuffer == null || outboundBuffer.isEmpty()) {
|
|
return;
|
|
}
|
|
|
|
inFlush0 = true;
|
|
|
|
// Mark all pending write requests as failure if the channel is inactive.
|
|
if (!isActive()) {
|
|
try {
|
|
if (isOpen()) {
|
|
outboundBuffer.failFlushed(NOT_YET_CONNECTED_EXCEPTION);
|
|
} else {
|
|
outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION);
|
|
}
|
|
} finally {
|
|
inFlush0 = false;
|
|
}
|
|
return;
|
|
}
|
|
|
|
try {
|
|
doWrite(outboundBuffer);
|
|
} catch (Throwable t) {
|
|
outboundBuffer.failFlushed(t);
|
|
if (t instanceof IOException && config().isAutoClose()) {
|
|
close(voidPromise());
|
|
}
|
|
} finally {
|
|
inFlush0 = false;
|
|
}
|
|
}
|
|
|
|
@Override
|
|
public final ChannelPromise voidPromise() {
|
|
return unsafeVoidPromise;
|
|
}
|
|
|
|
protected final boolean ensureOpen(ChannelPromise promise) {
|
|
if (isOpen()) {
|
|
return true;
|
|
}
|
|
|
|
safeSetFailure(promise, CLOSED_CHANNEL_EXCEPTION);
|
|
return false;
|
|
}
|
|
|
|
/**
|
|
* Marks the specified {@code promise} as success. If the {@code promise} is done already, log a message.
|
|
*/
|
|
protected final void safeSetSuccess(ChannelPromise promise) {
|
|
if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
|
|
logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Marks the specified {@code promise} as failure. If the {@code promise} is done already, log a message.
|
|
*/
|
|
protected final void safeSetFailure(ChannelPromise promise, Throwable cause) {
|
|
if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) {
|
|
logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause);
|
|
}
|
|
}
|
|
|
|
protected final void closeIfClosed() {
|
|
if (isOpen()) {
|
|
return;
|
|
}
|
|
close(voidPromise());
|
|
}
|
|
|
|
private void invokeLater(Runnable task) {
|
|
try {
|
|
// This method is used by outbound operation implementations to trigger an inbound event later.
|
|
// They do not trigger an inbound event immediately because an outbound operation might have been
|
|
// triggered by another inbound event handler method. If fired immediately, the call stack
|
|
// will look like this for example:
|
|
//
|
|
// handlerA.inboundBufferUpdated() - (1) an inbound handler method closes a connection.
|
|
// -> handlerA.ctx.close()
|
|
// -> channel.unsafe.close()
|
|
// -> handlerA.channelInactive() - (2) another inbound handler method called while in (1) yet
|
|
//
|
|
// which means the execution of two inbound handler methods of the same handler overlap undesirably.
|
|
eventLoop().execute(task);
|
|
} catch (RejectedExecutionException e) {
|
|
logger.warn("Can't invoke task later as EventLoop rejected it", e);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Appends the remote address to the message of the exceptions caused by connection attempt failure.
|
|
*/
|
|
protected final Throwable annotateConnectException(Throwable cause, SocketAddress remoteAddress) {
|
|
if (cause instanceof ConnectException) {
|
|
Throwable newT = new ConnectException(cause.getMessage() + ": " + remoteAddress);
|
|
newT.setStackTrace(cause.getStackTrace());
|
|
cause = newT;
|
|
} else if (cause instanceof NoRouteToHostException) {
|
|
Throwable newT = new NoRouteToHostException(cause.getMessage() + ": " + remoteAddress);
|
|
newT.setStackTrace(cause.getStackTrace());
|
|
cause = newT;
|
|
} else if (cause instanceof SocketException) {
|
|
Throwable newT = new SocketException(cause.getMessage() + ": " + remoteAddress);
|
|
newT.setStackTrace(cause.getStackTrace());
|
|
cause = newT;
|
|
}
|
|
|
|
return cause;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Return {@code true} if the given {@link EventLoop} is compatible with this instance.
|
|
*/
|
|
protected abstract boolean isCompatible(EventLoop loop);
|
|
|
|
/**
|
|
* Returns the {@link SocketAddress} which is bound locally.
|
|
*/
|
|
protected abstract SocketAddress localAddress0();
|
|
|
|
/**
|
|
* Return the {@link SocketAddress} which the {@link Channel} is connected to.
|
|
*/
|
|
protected abstract SocketAddress remoteAddress0();
|
|
|
|
/**
|
|
* Is called after the {@link Channel} is registered with its {@link EventLoop} as part of the register process.
|
|
*
|
|
* Sub-classes may override this method
|
|
*/
|
|
protected void doRegister() throws Exception {
|
|
// NOOP
|
|
}
|
|
|
|
/**
|
|
* Bind the {@link Channel} to the {@link SocketAddress}
|
|
*/
|
|
protected abstract void doBind(SocketAddress localAddress) throws Exception;
|
|
|
|
/**
|
|
* Disconnect this {@link Channel} from its remote peer
|
|
*/
|
|
protected abstract void doDisconnect() throws Exception;
|
|
|
|
/**
|
|
* Close the {@link Channel}
|
|
*/
|
|
protected abstract void doClose() throws Exception;
|
|
|
|
/**
|
|
* Deregister the {@link Channel} from its {@link EventLoop}.
|
|
*
|
|
* Sub-classes may override this method
|
|
*/
|
|
protected void doDeregister() throws Exception {
|
|
// NOOP
|
|
}
|
|
|
|
/**
|
|
* Schedule a read operation.
|
|
*/
|
|
protected abstract void doBeginRead() throws Exception;
|
|
|
|
/**
|
|
* Flush the content of the given buffer to the remote peer.
|
|
*/
|
|
protected abstract void doWrite(ChannelOutboundBuffer in) throws Exception;
|
|
|
|
/**
|
|
* Invoked when a new message is added to a {@link ChannelOutboundBuffer} of this {@link AbstractChannel}, so that
|
|
* the {@link Channel} implementation converts the message to another. (e.g. heap buffer -> direct buffer)
|
|
*/
|
|
protected Object filterOutboundMessage(Object msg) throws Exception {
|
|
return msg;
|
|
}
|
|
|
|
static final class CloseFuture extends DefaultChannelPromise {
|
|
|
|
CloseFuture(AbstractChannel ch) {
|
|
super(ch);
|
|
}
|
|
|
|
@Override
|
|
public ChannelPromise setSuccess() {
|
|
throw new IllegalStateException();
|
|
}
|
|
|
|
@Override
|
|
public ChannelPromise setFailure(Throwable cause) {
|
|
throw new IllegalStateException();
|
|
}
|
|
|
|
@Override
|
|
public boolean trySuccess() {
|
|
throw new IllegalStateException();
|
|
}
|
|
|
|
@Override
|
|
public boolean tryFailure(Throwable cause) {
|
|
throw new IllegalStateException();
|
|
}
|
|
|
|
boolean setClosed() {
|
|
return super.trySuccess();
|
|
}
|
|
}
|
|
}
|