No need for AtomicBoolean as we run in the eventloop. See #396
This commit is contained in:
parent
eccc28965e
commit
31cebd7ce2
@ -28,7 +28,6 @@ import java.nio.channels.AsynchronousChannelGroup;
|
|||||||
import java.nio.channels.AsynchronousCloseException;
|
import java.nio.channels.AsynchronousCloseException;
|
||||||
import java.nio.channels.AsynchronousServerSocketChannel;
|
import java.nio.channels.AsynchronousServerSocketChannel;
|
||||||
import java.nio.channels.AsynchronousSocketChannel;
|
import java.nio.channels.AsynchronousSocketChannel;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
|
|
||||||
public class AioServerSocketChannel extends AbstractAioChannel implements ServerSocketChannel {
|
public class AioServerSocketChannel extends AbstractAioChannel implements ServerSocketChannel {
|
||||||
|
|
||||||
@ -36,7 +35,7 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server
|
|||||||
private static final InternalLogger logger =
|
private static final InternalLogger logger =
|
||||||
InternalLoggerFactory.getInstance(AioServerSocketChannel.class);
|
InternalLoggerFactory.getInstance(AioServerSocketChannel.class);
|
||||||
private volatile AioServerSocketChannelConfig config;
|
private volatile AioServerSocketChannelConfig config;
|
||||||
final AtomicBoolean closed = new AtomicBoolean(false);
|
private boolean closed;
|
||||||
|
|
||||||
public AioServerSocketChannel() {
|
public AioServerSocketChannel() {
|
||||||
super(null, null);
|
super(null, null);
|
||||||
@ -89,7 +88,8 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void doClose() throws Exception {
|
protected void doClose() throws Exception {
|
||||||
if (closed.compareAndSet(false, true)) {
|
if (!closed) {
|
||||||
|
closed = true;
|
||||||
javaChannel().close();
|
javaChannel().close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -136,7 +136,7 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server
|
|||||||
boolean asyncClosed = false;
|
boolean asyncClosed = false;
|
||||||
if (t instanceof AsynchronousCloseException) {
|
if (t instanceof AsynchronousCloseException) {
|
||||||
asyncClosed = true;
|
asyncClosed = true;
|
||||||
channel.closed.set(true);
|
channel.closed = true;
|
||||||
}
|
}
|
||||||
// check if the exception was thrown because the channel was closed before
|
// check if the exception was thrown because the channel was closed before
|
||||||
// log something
|
// log something
|
||||||
|
@ -29,7 +29,6 @@ import java.nio.channels.AsynchronousChannelGroup;
|
|||||||
import java.nio.channels.AsynchronousCloseException;
|
import java.nio.channels.AsynchronousCloseException;
|
||||||
import java.nio.channels.AsynchronousSocketChannel;
|
import java.nio.channels.AsynchronousSocketChannel;
|
||||||
import java.nio.channels.CompletionHandler;
|
import java.nio.channels.CompletionHandler;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
|
|
||||||
|
|
||||||
public class AioSocketChannel extends AbstractAioChannel implements SocketChannel {
|
public class AioSocketChannel extends AbstractAioChannel implements SocketChannel {
|
||||||
@ -38,8 +37,8 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
|
|||||||
private static final CompletionHandler<Integer, AioSocketChannel> READ_HANDLER = new ReadHandler();
|
private static final CompletionHandler<Integer, AioSocketChannel> READ_HANDLER = new ReadHandler();
|
||||||
private static final CompletionHandler<Integer, AioSocketChannel> WRITE_HANDLER = new WriteHandler();
|
private static final CompletionHandler<Integer, AioSocketChannel> WRITE_HANDLER = new WriteHandler();
|
||||||
|
|
||||||
private final AtomicBoolean closed = new AtomicBoolean(false);
|
private boolean closed;
|
||||||
private final AtomicBoolean flushing = new AtomicBoolean(false);
|
private boolean flushing;
|
||||||
private volatile AioSocketChannelConfig config;
|
private volatile AioSocketChannelConfig config;
|
||||||
|
|
||||||
public AioSocketChannel() {
|
public AioSocketChannel() {
|
||||||
@ -158,7 +157,8 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void doClose() throws Exception {
|
protected void doClose() throws Exception {
|
||||||
if (closed.compareAndSet(false, true)) {
|
if (!closed) {
|
||||||
|
closed = true;
|
||||||
javaChannel().close();
|
javaChannel().close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -179,7 +179,8 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
|
|||||||
// Only one pending write can be scheduled at one time. Otherwise
|
// Only one pending write can be scheduled at one time. Otherwise
|
||||||
// a PendingWriteException will be thrown. So use CAS to not run
|
// a PendingWriteException will be thrown. So use CAS to not run
|
||||||
// into this
|
// into this
|
||||||
if (flushing.compareAndSet(false, true)) {
|
if (!flushing) {
|
||||||
|
flushing = true;
|
||||||
ByteBuffer buffer = buf.nioBuffer();
|
ByteBuffer buffer = buf.nioBuffer();
|
||||||
javaChannel().write(buffer, this, WRITE_HANDLER);
|
javaChannel().write(buffer, this, WRITE_HANDLER);
|
||||||
}
|
}
|
||||||
@ -204,7 +205,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Allow to have the next write pending
|
// Allow to have the next write pending
|
||||||
channel.flushing.set(false);
|
channel.flushing = false;
|
||||||
try {
|
try {
|
||||||
// try to flush it again if nothing is left it will return fast here
|
// try to flush it again if nothing is left it will return fast here
|
||||||
channel.doFlushByteBuffer(buf);
|
channel.doFlushByteBuffer(buf);
|
||||||
@ -217,7 +218,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
|
|||||||
@Override
|
@Override
|
||||||
protected void failed0(Throwable cause, AioSocketChannel channel) {
|
protected void failed0(Throwable cause, AioSocketChannel channel) {
|
||||||
if (cause instanceof AsynchronousCloseException) {
|
if (cause instanceof AsynchronousCloseException) {
|
||||||
channel.closed.set(true);
|
channel.closed = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
channel.notifyFlushFutures(cause);
|
channel.notifyFlushFutures(cause);
|
||||||
@ -232,7 +233,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Allow to have the next write pending
|
// Allow to have the next write pending
|
||||||
channel.flushing.set(false);
|
channel.flushing = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -263,7 +264,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
|
|||||||
|
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
if (t instanceof AsynchronousCloseException) {
|
if (t instanceof AsynchronousCloseException) {
|
||||||
channel.closed.set(true);
|
channel.closed = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (read) {
|
if (read) {
|
||||||
@ -293,7 +294,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
|
|||||||
@Override
|
@Override
|
||||||
protected void failed0(Throwable t, AioSocketChannel channel) {
|
protected void failed0(Throwable t, AioSocketChannel channel) {
|
||||||
if (t instanceof AsynchronousCloseException) {
|
if (t instanceof AsynchronousCloseException) {
|
||||||
channel.closed.set(true);
|
channel.closed = true;
|
||||||
|
|
||||||
// TODO: This seems wrong!
|
// TODO: This seems wrong!
|
||||||
return;
|
return;
|
||||||
@ -323,7 +324,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
|
|||||||
@Override
|
@Override
|
||||||
protected void failed0(Throwable exc, AioSocketChannel channel) {
|
protected void failed0(Throwable exc, AioSocketChannel channel) {
|
||||||
if (exc instanceof AsynchronousCloseException) {
|
if (exc instanceof AsynchronousCloseException) {
|
||||||
channel.closed.set(true);
|
channel.closed = true;
|
||||||
}
|
}
|
||||||
((AsyncUnsafe) channel.unsafe()).connectFailed(exc);
|
((AsyncUnsafe) channel.unsafe()).connectFailed(exc);
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user