Update proxy example to show to implement a proxy with manual read operations for optimal memory usage
This commit is contained in:
parent
2f737d4e70
commit
e5c326949d
@ -43,8 +43,8 @@ public class HexDumpProxy {
|
||||
b.group(new NioEventLoopGroup(), new NioEventLoopGroup())
|
||||
.channel(NioServerSocketChannel.class)
|
||||
.childHandler(new HexDumpProxyInitializer(remoteHost, remotePort))
|
||||
.childOption(ChannelOption.AUTO_READ, false);
|
||||
b.bind(localPort).sync().channel().closeFuture().sync();
|
||||
.childOption(ChannelOption.AUTO_READ, false)
|
||||
.bind(localPort).sync().channel().closeFuture().sync();
|
||||
} finally {
|
||||
b.shutdown();
|
||||
}
|
||||
|
@ -17,6 +17,8 @@ package io.netty.example.proxy;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelFutureListener;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundByteHandlerAdapter;
|
||||
|
||||
@ -30,14 +32,24 @@ public class HexDumpProxyBackendHandler extends ChannelInboundByteHandlerAdapter
|
||||
|
||||
@Override
|
||||
public void channelActive(ChannelHandlerContext ctx) throws Exception {
|
||||
ctx.read();
|
||||
ctx.flush();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
|
||||
public void inboundBufferUpdated(final ChannelHandlerContext ctx, ByteBuf in) throws Exception {
|
||||
ByteBuf out = inboundChannel.outboundByteBuffer();
|
||||
out.writeBytes(in);
|
||||
inboundChannel.flush();
|
||||
inboundChannel.flush().addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
if (future.isSuccess()) {
|
||||
ctx.channel().read();
|
||||
} else {
|
||||
future.channel().close();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -22,6 +22,7 @@ import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelFutureListener;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundByteHandlerAdapter;
|
||||
import io.netty.channel.ChannelOption;
|
||||
import io.netty.channel.socket.nio.NioSocketChannel;
|
||||
|
||||
public class HexDumpProxyFrontendHandler extends ChannelInboundByteHandlerAdapter {
|
||||
@ -46,16 +47,16 @@ public class HexDumpProxyFrontendHandler extends ChannelInboundByteHandlerAdapte
|
||||
Bootstrap b = new Bootstrap();
|
||||
b.group(inboundChannel.eventLoop())
|
||||
.channel(NioSocketChannel.class)
|
||||
.handler(new HexDumpProxyBackendHandler(inboundChannel));
|
||||
|
||||
.handler(new HexDumpProxyBackendHandler(inboundChannel))
|
||||
.option(ChannelOption.AUTO_READ, false);
|
||||
ChannelFuture f = b.connect(remoteHost, remotePort);
|
||||
outboundChannel = f.channel();
|
||||
f.addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
if (future.isSuccess()) {
|
||||
// connection complete start to auto read data
|
||||
inboundChannel.config().setAutoRead(true);
|
||||
// connection complete start to read first data
|
||||
inboundChannel.read();
|
||||
} else {
|
||||
// Close the connection if the connection attempt has failed.
|
||||
inboundChannel.close();
|
||||
@ -65,11 +66,21 @@ public class HexDumpProxyFrontendHandler extends ChannelInboundByteHandlerAdapte
|
||||
}
|
||||
|
||||
@Override
|
||||
public void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
|
||||
public void inboundBufferUpdated(final ChannelHandlerContext ctx, ByteBuf in) throws Exception {
|
||||
ByteBuf out = outboundChannel.outboundByteBuffer();
|
||||
out.writeBytes(in);
|
||||
if (outboundChannel.isActive()) {
|
||||
outboundChannel.flush();
|
||||
outboundChannel.flush().addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
if (future.isSuccess()) {
|
||||
// was able to flush out data, start to read the next chunk
|
||||
ctx.channel().read();
|
||||
} else {
|
||||
future.channel().close();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user