Adjust name. See #396
This commit is contained in:
parent
2ff22ff4c3
commit
ffc6551acc
@ -121,7 +121,7 @@ public class AsyncServerSocketChannel extends AbstractAsyncChannel implements Se
|
|||||||
channel.javaChannel().accept(channel, this);
|
channel.javaChannel().accept(channel, this);
|
||||||
|
|
||||||
// create the socket add it to the buffer and fire the event
|
// create the socket add it to the buffer and fire the event
|
||||||
channel.pipeline().inboundMessageBuffer().add(new AsyncSocketchannel(channel, null, ch));
|
channel.pipeline().inboundMessageBuffer().add(new AsyncSocketChannel(channel, null, ch));
|
||||||
channel.pipeline().fireInboundBufferUpdated();
|
channel.pipeline().fireInboundBufferUpdated();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -32,11 +32,11 @@ import java.nio.channels.AsynchronousSocketChannel;
|
|||||||
import java.nio.channels.CompletionHandler;
|
import java.nio.channels.CompletionHandler;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
public class AsyncSocketchannel extends AbstractAsyncChannel {
|
public class AsyncSocketChannel extends AbstractAsyncChannel {
|
||||||
|
|
||||||
private static final CompletionHandler<Void, AsyncSocketchannel> CONNECT_HANDLER = new ConnectHandler();
|
private static final CompletionHandler<Void, AsyncSocketChannel> CONNECT_HANDLER = new ConnectHandler();
|
||||||
private static final CompletionHandler<Integer, AsyncSocketchannel> READ_HANDLER = new ReadHandler();
|
private static final CompletionHandler<Integer, AsyncSocketChannel> READ_HANDLER = new ReadHandler();
|
||||||
private static final CompletionHandler<Integer, AsyncSocketchannel> WRITE_HANDLER = new WriteHandler();
|
private static final CompletionHandler<Integer, AsyncSocketChannel> WRITE_HANDLER = new WriteHandler();
|
||||||
private static final ChannelStateHandler READ_START_HANDLER = new ChannelStateHandlerAdapter() {
|
private static final ChannelStateHandler READ_START_HANDLER = new ChannelStateHandlerAdapter() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -45,7 +45,7 @@ public class AsyncSocketchannel extends AbstractAsyncChannel {
|
|||||||
super.channelActive(ctx);
|
super.channelActive(ctx);
|
||||||
|
|
||||||
// once the channel is active, the first read is scheduled
|
// once the channel is active, the first read is scheduled
|
||||||
AsyncSocketchannel.read((AsyncSocketchannel)ctx.channel());
|
AsyncSocketChannel.read((AsyncSocketChannel)ctx.channel());
|
||||||
|
|
||||||
} finally {
|
} finally {
|
||||||
ctx.pipeline().remove(this);
|
ctx.pipeline().remove(this);
|
||||||
@ -58,11 +58,11 @@ public class AsyncSocketchannel extends AbstractAsyncChannel {
|
|||||||
private final AtomicBoolean flushing = new AtomicBoolean(false);
|
private final AtomicBoolean flushing = new AtomicBoolean(false);
|
||||||
private volatile AsyncSocketChannelConfig config;
|
private volatile AsyncSocketChannelConfig config;
|
||||||
|
|
||||||
public AsyncSocketchannel() {
|
public AsyncSocketChannel() {
|
||||||
this(null, null, null);
|
this(null, null, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public AsyncSocketchannel(AsyncServerSocketChannel parent, Integer id, AsynchronousSocketChannel channel) {
|
public AsyncSocketChannel(AsyncServerSocketChannel parent, Integer id, AsynchronousSocketChannel channel) {
|
||||||
super(parent, id);
|
super(parent, id);
|
||||||
this.ch = channel;
|
this.ch = channel;
|
||||||
if (ch != null) {
|
if (ch != null) {
|
||||||
@ -133,10 +133,10 @@ public class AsyncSocketchannel extends AbstractAsyncChannel {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Trigger a read from the {@link AsyncSocketchannel}
|
* Trigger a read from the {@link AsyncSocketChannel}
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
private static void read(AsyncSocketchannel channel) {
|
private static void read(AsyncSocketChannel channel) {
|
||||||
ByteBuf byteBuf = channel.pipeline().inboundByteBuffer();
|
ByteBuf byteBuf = channel.pipeline().inboundByteBuffer();
|
||||||
expandReadBuffer(byteBuf);
|
expandReadBuffer(byteBuf);
|
||||||
|
|
||||||
@ -189,10 +189,10 @@ public class AsyncSocketchannel extends AbstractAsyncChannel {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private static final class WriteHandler implements CompletionHandler<Integer, AsyncSocketchannel> {
|
private static final class WriteHandler implements CompletionHandler<Integer, AsyncSocketChannel> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void completed(Integer result, AsyncSocketchannel channel) {
|
public void completed(Integer result, AsyncSocketChannel channel) {
|
||||||
ByteBuf buf = channel.pipeline().outboundByteBuffer();
|
ByteBuf buf = channel.pipeline().outboundByteBuffer();
|
||||||
|
|
||||||
if (result > 0) {
|
if (result > 0) {
|
||||||
@ -213,7 +213,7 @@ public class AsyncSocketchannel extends AbstractAsyncChannel {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void failed(Throwable cause, AsyncSocketchannel channel) {
|
public void failed(Throwable cause, AsyncSocketChannel channel) {
|
||||||
ByteBuf buf = channel.pipeline().outboundByteBuffer();
|
ByteBuf buf = channel.pipeline().outboundByteBuffer();
|
||||||
if (!buf.readable()) {
|
if (!buf.readable()) {
|
||||||
buf.discardReadBytes();
|
buf.discardReadBytes();
|
||||||
@ -229,10 +229,10 @@ public class AsyncSocketchannel extends AbstractAsyncChannel {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final class ReadHandler implements CompletionHandler<Integer, AsyncSocketchannel> {
|
private static final class ReadHandler implements CompletionHandler<Integer, AsyncSocketChannel> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void completed(Integer result, AsyncSocketchannel channel) {
|
public void completed(Integer result, AsyncSocketChannel channel) {
|
||||||
assert channel.eventLoop().inEventLoop();
|
assert channel.eventLoop().inEventLoop();
|
||||||
|
|
||||||
final ChannelPipeline pipeline = channel.pipeline();
|
final ChannelPipeline pipeline = channel.pipeline();
|
||||||
@ -273,35 +273,35 @@ public class AsyncSocketchannel extends AbstractAsyncChannel {
|
|||||||
channel.close(channel.unsafe().voidFuture());
|
channel.close(channel.unsafe().voidFuture());
|
||||||
} else {
|
} else {
|
||||||
// start the next read
|
// start the next read
|
||||||
AsyncSocketchannel.read(channel);
|
AsyncSocketChannel.read(channel);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void failed(Throwable t, AsyncSocketchannel channel) {
|
public void failed(Throwable t, AsyncSocketChannel channel) {
|
||||||
channel.pipeline().fireExceptionCaught(t);
|
channel.pipeline().fireExceptionCaught(t);
|
||||||
if (t instanceof IOException) {
|
if (t instanceof IOException) {
|
||||||
channel.close(channel.unsafe().voidFuture());
|
channel.close(channel.unsafe().voidFuture());
|
||||||
} else {
|
} else {
|
||||||
// start the next read
|
// start the next read
|
||||||
AsyncSocketchannel.read(channel);
|
AsyncSocketChannel.read(channel);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final class ConnectHandler implements CompletionHandler<Void, AsyncSocketchannel> {
|
private static final class ConnectHandler implements CompletionHandler<Void, AsyncSocketChannel> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void completed(Void result, AsyncSocketchannel channel) {
|
public void completed(Void result, AsyncSocketChannel channel) {
|
||||||
((AsyncUnsafe) channel.unsafe()).connectSuccess();
|
((AsyncUnsafe) channel.unsafe()).connectSuccess();
|
||||||
|
|
||||||
// start reading from channel
|
// start reading from channel
|
||||||
AsyncSocketchannel.read(channel);
|
AsyncSocketChannel.read(channel);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void failed(Throwable exc, AsyncSocketchannel channel) {
|
public void failed(Throwable exc, AsyncSocketChannel channel) {
|
||||||
((AsyncUnsafe) channel.unsafe()).connectFailed(exc);
|
((AsyncUnsafe) channel.unsafe()).connectFailed(exc);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -10,25 +10,25 @@ import io.netty.channel.ChannelInboundByteHandlerAdapter;
|
|||||||
import io.netty.channel.ChannelInitializer;
|
import io.netty.channel.ChannelInitializer;
|
||||||
import io.netty.channel.socket.nio2.AsyncEventLoop;
|
import io.netty.channel.socket.nio2.AsyncEventLoop;
|
||||||
import io.netty.channel.socket.nio2.AsyncServerSocketChannel;
|
import io.netty.channel.socket.nio2.AsyncServerSocketChannel;
|
||||||
import io.netty.channel.socket.nio2.AsyncSocketchannel;
|
import io.netty.channel.socket.nio2.AsyncSocketChannel;
|
||||||
import io.netty.util.CharsetUtil;
|
|
||||||
|
|
||||||
public class AsyncTransportTest {
|
public class AsyncTransportTest {
|
||||||
|
|
||||||
public static void main(String args[]) {
|
public static void main(String args[]) {
|
||||||
|
AsyncEventLoop loop = new AsyncEventLoop();
|
||||||
// Configure a test server
|
// Configure a test server
|
||||||
ServerBootstrap sb = new ServerBootstrap();
|
ServerBootstrap sb = new ServerBootstrap();
|
||||||
sb.eventLoop(new AsyncEventLoop(), new AsyncEventLoop())
|
sb.eventLoop(loop, loop)
|
||||||
.channel(new AsyncServerSocketChannel())
|
.channel(new AsyncServerSocketChannel())
|
||||||
.localAddress(new InetSocketAddress(9999))
|
.localAddress(new InetSocketAddress(9191))
|
||||||
.childHandler(new ChannelInitializer<AsyncSocketchannel>() {
|
.childHandler(new ChannelInitializer<AsyncSocketChannel>() {
|
||||||
@Override
|
@Override
|
||||||
public void initChannel(AsyncSocketchannel ch) throws Exception {
|
public void initChannel(AsyncSocketChannel ch) throws Exception {
|
||||||
ch.pipeline().addLast(new ChannelInboundByteHandlerAdapter() {
|
ch.pipeline().addLast(new ChannelInboundByteHandlerAdapter() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
|
public void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
|
||||||
System.out.print(in.toString(CharsetUtil.US_ASCII));
|
ctx.write(in.slice());
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user