Fix another data race
This commit is contained in:
parent
b97b3c602b
commit
b79e0b0882
@ -38,12 +38,8 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
|
|||||||
private static final ChannelMetadata METADATA = new ChannelMetadata(ChannelBufType.BYTE, false);
|
private static final ChannelMetadata METADATA = new ChannelMetadata(ChannelBufType.BYTE, false);
|
||||||
|
|
||||||
private static final CompletionHandler<Void, AioSocketChannel> CONNECT_HANDLER = new ConnectHandler();
|
private static final CompletionHandler<Void, AioSocketChannel> CONNECT_HANDLER = new ConnectHandler();
|
||||||
private static final CompletionHandler<Integer, AioSocketChannel> READ_HANDLER = new ReadHandler();
|
|
||||||
private static final CompletionHandler<Integer, AioSocketChannel> WRITE_HANDLER = new WriteHandler();
|
private static final CompletionHandler<Integer, AioSocketChannel> WRITE_HANDLER = new WriteHandler();
|
||||||
|
private static final CompletionHandler<Integer, AioSocketChannel> READ_HANDLER = new ReadHandler();
|
||||||
private final AioSocketChannelConfig config;
|
|
||||||
private boolean closed;
|
|
||||||
private boolean flushing;
|
|
||||||
|
|
||||||
private static AsynchronousSocketChannel newSocket() {
|
private static AsynchronousSocketChannel newSocket() {
|
||||||
try {
|
try {
|
||||||
@ -53,6 +49,26 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private final Runnable readTask = new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
ByteBuf byteBuf = pipeline().inboundByteBuffer();
|
||||||
|
if (!byteBuf.readable()) {
|
||||||
|
byteBuf.clear();
|
||||||
|
} else {
|
||||||
|
expandReadBuffer(byteBuf);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get a ByteBuffer view on the ByteBuf
|
||||||
|
ByteBuffer buffer = byteBuf.nioBuffer(byteBuf.writerIndex(), byteBuf.writableBytes());
|
||||||
|
javaChannel().read(buffer, AioSocketChannel.this, READ_HANDLER);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
private final AioSocketChannelConfig config;
|
||||||
|
private boolean closed;
|
||||||
|
private boolean flushing;
|
||||||
|
|
||||||
public AioSocketChannel() {
|
public AioSocketChannel() {
|
||||||
this(null, null, newSocket());
|
this(null, null, newSocket());
|
||||||
}
|
}
|
||||||
@ -115,27 +131,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
|
|||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
return new Runnable() {
|
return readTask;
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
read();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Trigger a read from the {@link AioSocketChannel}
|
|
||||||
*/
|
|
||||||
void read() {
|
|
||||||
ByteBuf byteBuf = pipeline().inboundByteBuffer();
|
|
||||||
if (!byteBuf.readable()) {
|
|
||||||
byteBuf.clear();
|
|
||||||
} else {
|
|
||||||
expandReadBuffer(byteBuf);
|
|
||||||
}
|
|
||||||
// Get a ByteBuffer view on the ByteBuf
|
|
||||||
ByteBuffer buffer = byteBuf.nioBuffer(byteBuf.writerIndex(), byteBuf.writableBytes());
|
|
||||||
javaChannel().read(buffer, this, READ_HANDLER);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static boolean expandReadBuffer(ByteBuf byteBuf) {
|
private static boolean expandReadBuffer(ByteBuf byteBuf) {
|
||||||
@ -294,7 +290,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
|
|||||||
channel.unsafe().close(channel.unsafe().voidFuture());
|
channel.unsafe().close(channel.unsafe().voidFuture());
|
||||||
} else {
|
} else {
|
||||||
// start the next read
|
// start the next read
|
||||||
channel.read();
|
channel.eventLoop().execute(channel.readTask);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -312,7 +308,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
|
|||||||
channel.unsafe().close(channel.unsafe().voidFuture());
|
channel.unsafe().close(channel.unsafe().voidFuture());
|
||||||
} else {
|
} else {
|
||||||
// start the next read
|
// start the next read
|
||||||
channel.read();
|
channel.eventLoop().execute(channel.readTask);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -325,7 +321,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
|
|||||||
channel.pipeline().fireChannelActive();
|
channel.pipeline().fireChannelActive();
|
||||||
|
|
||||||
// start reading from channel
|
// start reading from channel
|
||||||
channel.read();
|
channel.eventLoop().execute(channel.readTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
Loading…
x
Reference in New Issue
Block a user