write all messages

Motivation:

write until there is nothing left in the buffer

Modification:

eventloop executes the next write event

Result:
write all messages
This commit is contained in:
Josef Grieb 2020-07-28 21:36:11 +02:00
parent df84759128
commit a29b8c5cb3
2 changed files with 10 additions and 4 deletions

View File

@ -165,10 +165,8 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
unsafe.executeUringReadOperator(); unsafe.executeUringReadOperator();
} }
//Channel/ChannelHandlerContext.write
@Override @Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception { protected void doWrite(ChannelOutboundBuffer in) throws Exception {
//Todo write until there is nothing left in the buffer
if (in.size() >= 1) { if (in.size() >= 1) {
Object msg = in.current(); Object msg = in.current();
if (msg instanceof ByteBuf) { if (msg instanceof ByteBuf) {

View File

@ -16,6 +16,7 @@
package io.netty.channel.uring; package io.netty.channel.uring;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.SingleThreadEventLoop; import io.netty.channel.SingleThreadEventLoop;
@ -163,12 +164,19 @@ final class IOUringEventLoop extends SingleThreadEventLoop {
break; break;
case WRITE: case WRITE:
System.out.println("EventLoop Write Res: " + ioUringCqe.getRes()); System.out.println("EventLoop Write Res: " + ioUringCqe.getRes());
System.out.println("EventLoop Fd: " + event.getAbstractIOUringChannel().getSocket().getFd()); System.out.println("EventLoop Fd: " + event.getAbstractIOUringChannel().getSocket().intValue());
System.out.println("EventLoop Pipeline: " + event.getAbstractIOUringChannel().eventLoop()); System.out.println("EventLoop Pipeline: " + event.getAbstractIOUringChannel().eventLoop());
ChannelOutboundBuffer channelOutboundBuffer = event
.getAbstractIOUringChannel().unsafe().outboundBuffer();
//remove bytes //remove bytes
int localFlushAmount = ioUringCqe.getRes(); int localFlushAmount = ioUringCqe.getRes();
if (localFlushAmount > 0) { if (localFlushAmount > 0) {
event.getAbstractIOUringChannel().unsafe().outboundBuffer().removeBytes(localFlushAmount); channelOutboundBuffer.removeBytes(localFlushAmount);
}
try {
event.getAbstractIOUringChannel().doWrite(channelOutboundBuffer);
} catch (Exception e) {
e.printStackTrace();
} }
break; break;
} }