diff --git a/README.adoc b/README.adoc new file mode 100644 index 0000000..b947f9d --- /dev/null +++ b/README.adoc @@ -0,0 +1,125 @@ += Netty Incubator Buffer API + +This repository is incubating a new buffer API proposed for Netty 5. + +== Building and Testing + +Short version: just run `make`. + +The project currently relies on snapshot versions of the https://github.com/openjdk/panama-foreign[Panama Foreign] fork of OpenJDK. +This allows us to test out the most recent version of the `jdk.incubator.foreign` APIs, but also make building, and local development more involved. +To simplify things, we have a Docker based build, controlled via a Makefile with the following commands: + +* `image` – build the docker image.This includes building a snapshot of OpenJDK, and download all relevant Maven dependencies. +* `test` – run all tests in a docker container.This implies `image`.The container is automatically deleted afterwards. +* `dbg` – drop into a shell in the build container, without running the build itself.The debugging container is not deleted afterwards. +* `clean` – remove the leftover containers created by `dbg`, `test`, and `build`. +* `build` – build binaries and run all tests in a container, and copy the `target` directory out of the container afterwards.This is the default build target. + +== Example: Echo Client and Server + +Making use of this new buffer API on the client side is quite easy. +Even though Netty 5 does not have native support for these buffers, it is able to convert them to the old `ByteBuf` API as needed. +This means we are able to send incubator buffers through a Netty pipeline, and have it work as if we were sending `ByteBuf` instances. + +[source,java] +---- +public final class Client { + public static void main(String[] args) throws Exception { + EventLoopGroup group = new MultithreadEventLoopGroup(NioHandler.newFactory()); + try (BufferAllocator allocator = BufferAllocator.pooledDirect()) { // <1> + Bootstrap b = new Bootstrap(); + b.group(group) + .channel(NioSocketChannel.class) + .option(ChannelOption.TCP_NODELAY, true) + .handler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast(new ChannelHandlerAdapter() { + @Override + public void channelActive(ChannelHandlerContext ctx) { + Buffer message = allocator.allocate(256); // <2> + for (int i = 0; i < message.capacity(); i++) { + message.writeByte((byte) i); + } + ctx.writeAndFlush(message); // <3> + } + }); + } + }); + + // Start the client. + ChannelFuture f = b.connect("127.0.0.1", 8007).sync(); + + // Wait until the connection is closed. + f.channel().closeFuture().sync(); + } finally { + // Shut down the event loop to terminate all threads. + group.shutdownGracefully(); + } + } +} +---- +<1> A life-cycled allocator is created to wrap the scope of our application. +<2> Buffers are allocated with one of the `allocate` methods. +<3> The buffer can then be sent down the pipeline, and will be written to the socket just like a `ByteBuf` would. + +[NOTE] +-- +The same is not the case for `BufferHolder`. +It is not treated the same as a `ByteBufHolder`. +-- + +On the server size, things are more complicated because Netty itself will be allocating the buffers, and the `ByteBufAllocator` API is only capable of returning `ByteBuf` instances. +The `ByteBufAllocatorAdaptor` will allocate `ByteBuf` instances that are backed by the new buffers. +The buffers can then we extracted from the `ByteBuf` instances with the `ByteBufAdaptor.extract` method. + +We can tell a Netty server how to allocate buffers by setting the `ALLOCATOR` child-channel option: + +[source,java] +---- +ByteBufAllocatorAdaptor allocator = new ByteBufAllocatorAdaptor(); // <1> +ServerBootstrap server = new ServerBootstrap(); +server.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .childOption(ChannelOption.ALLOCATOR, allocator) // <2> + .handler(new EchoServerHandler()); +---- +<1> The `ByteBufAllocatorAdaptor` implements `ByteBufAllocator`, and directly allocates `ByteBuf` instances that are backed by buffers that use the new API. +<2> To make Netty use a given allocator when allocating buffers for receiving data, we set the allocator as a child option. + +With the above, we just changed how the buffers are allocated, but we haven't changed the API we use for interacting with the buffers. +The buffers are still allocated at `ByteBuf` instances, and flow through the pipeline as such. +If we want to use the new buffer API in our server handlers, we have to extract the buffers from the `ByteBuf` instances that are passed down: + +[source,java] +---- +import io.netty.buffer.ByteBuf; +import io.netty.buffer.api.Buffer; +import io.netty.buffer.api.adaptor.ByteBufAdaptor; + +@Sharable +public class EchoServerHandler implements ChannelHandler { + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { // <1> + if (msg instanceof ByteBuf) { // <2> + // For this example, we only echo back buffers that are using the new buffer API. + Buffer buf = ByteBufAdaptor.extract((ByteBuf) msg); // <3> + ctx.write(buf); // <4> + } + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) { + ctx.flush(); + } +} +---- +<1> Netty pipelines are defined as transferring `Object` instances as messages. +<2> When we receive data directly from a socket, these messages will be `ByteBuf` instances with the received data. +<3> Since we set the allocator to create `ByteBuf` instances that are backed by buffers with the new API, we will be able to extract the backing `Buffer` instances. +<4> We can then operate on the extracted `Buffer` instances directly. +The `Buffer` and `ByteBuf` instances mirror each other exactly. +In this case, we just write them back to the client that sent the data to us. + +The files in `src/test/java/io/netty/buffer/api/examples/echo` for the full source code to this example. \ No newline at end of file diff --git a/README.md b/README.md deleted file mode 100644 index 45acec6..0000000 --- a/README.md +++ /dev/null @@ -1,17 +0,0 @@ -# Netty Incubator Buffer API - -This repository is incubating a new buffer API proposed for Netty 5. - -## Building and Testing - -Short version: just run `make`. - -The project currently relies on snapshot versions of the [Panama Foreign](https://github.com/openjdk/panama-foreign) fork of OpenJDK. -This allows us to test out the most recent version of the `jdk.incubator.foreign` APIs, but also make building, and local development more involved. -To simplify things, we have a Docker based build, controlled via a Makefile with the following commands: - -* `image` – build the docker image. This includes building a snapshot of OpenJDK, and download all relevant Maven dependencies. -* `test` – run all tests in a docker container. This implies `image`. The container is automatically deleted afterwards. -* `dbg` – drop into a shell in the build container, without running the build itself. The debugging container is not deleted afterwards. -* `clean` – remove the leftover containers created by `dbg`, `test`, and `build`. -* `build` – build binaries and run all tests in a container, and copy the `target` directory out of the container afterwards. This is the default build target. diff --git a/src/main/java/io/netty/buffer/api/adaptor/ByteBufAdaptor.java b/src/main/java/io/netty/buffer/api/adaptor/ByteBufAdaptor.java index e55baee..20a2243 100644 --- a/src/main/java/io/netty/buffer/api/adaptor/ByteBufAdaptor.java +++ b/src/main/java/io/netty/buffer/api/adaptor/ByteBufAdaptor.java @@ -33,7 +33,7 @@ import java.nio.channels.GatheringByteChannel; import java.nio.channels.ScatteringByteChannel; import java.nio.charset.Charset; -public class ByteBufAdaptor extends ByteBuf { +public final class ByteBufAdaptor extends ByteBuf { private final ByteBufAllocatorAdaptor alloc; private final Buffer buffer; @@ -42,6 +42,23 @@ public class ByteBufAdaptor extends ByteBuf { this.buffer = buffer; } + /** + * Extracts the underlying {@link Buffer} instance that is backing this {@link ByteBuf}, if any. + * This is similar to {@link #unwrap()} except the return type is a {@link Buffer}. + * If this {@link ByteBuf} does not wrap a {@link Buffer}, then {@code null} is returned. + * + * @param byteBuf The {@link ByteBuf} to extract the {@link Buffer} from. + * @return The {@link Buffer} instance that is backing the given {@link ByteBuf}, or {@code null} if the given + * {@link ByteBuf} is not backed by a {@link Buffer}. + */ + public static Buffer extract(ByteBuf byteBuf) { + if (byteBuf instanceof ByteBufAdaptor) { + ByteBufAdaptor bba = (ByteBufAdaptor) byteBuf; + return bba.buffer; + } + return null; + } + @Override public int capacity() { return buffer.capacity(); @@ -1005,7 +1022,9 @@ public class ByteBufAdaptor extends ByteBuf { return true; }); int read = (int) in.read(components); - writerIndex(read + writerIndex()); + if (read > 0) { + writerIndex(read + writerIndex()); + } return read; } @@ -1019,7 +1038,10 @@ public class ByteBufAdaptor extends ByteBuf { }); int read = 0; for (ByteBuffer component : components) { - read += in.read(component, position + read); + int r = in.read(component, position + read); + if (r > 0) { + read += r; + } if (component.hasRemaining()) { break; } diff --git a/src/main/java/io/netty/buffer/api/adaptor/ByteBufAllocatorAdaptor.java b/src/main/java/io/netty/buffer/api/adaptor/ByteBufAllocatorAdaptor.java index 22e4d0d..760131b 100644 --- a/src/main/java/io/netty/buffer/api/adaptor/ByteBufAllocatorAdaptor.java +++ b/src/main/java/io/netty/buffer/api/adaptor/ByteBufAllocatorAdaptor.java @@ -98,7 +98,9 @@ public class ByteBufAllocatorAdaptor implements ByteBufAllocator, AutoCloseable @Override public ByteBuf directBuffer(int initialCapacity) { - return new ByteBufAdaptor(this, offheap.allocate(initialCapacity)); + // TODO we cannot use off-heap buffers here, until the JDK allows direct byte buffers based on native + // memory segments to be used in IO operations. + return new ByteBufAdaptor(this, onheap.allocate(initialCapacity)); } @Override diff --git a/src/test/java/io/netty/buffer/api/EchoIT.java b/src/test/java/io/netty/buffer/api/EchoIT.java index 67b2a13..c9ea1fb 100644 --- a/src/test/java/io/netty/buffer/api/EchoIT.java +++ b/src/test/java/io/netty/buffer/api/EchoIT.java @@ -53,18 +53,18 @@ public class EchoIT { try { ServerBootstrap server = new ServerBootstrap(); server.group(bossGroup, workerGroup) - .channel(NioServerSocketChannel.class) - .option(ChannelOption.ALLOCATOR, allocator) - .option(ChannelOption.SO_BACKLOG, 100) - .handler(new LoggingHandler(LogLevel.INFO)) - .childHandler(new ChannelInitializer() { - @Override - public void initChannel(SocketChannel ch) throws Exception { - ChannelPipeline p = ch.pipeline(); - p.addLast(new LoggingHandler(LogLevel.INFO)); - p.addLast(serverHandler); - } - }); + .channel(NioServerSocketChannel.class) + .childOption(ChannelOption.ALLOCATOR, allocator) + .option(ChannelOption.SO_BACKLOG, 100) + .handler(new LoggingHandler(LogLevel.INFO)) + .childHandler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline p = ch.pipeline(); + p.addLast(new LoggingHandler(LogLevel.INFO)); + p.addLast(serverHandler); + } + }); // Start the server. ChannelFuture bind = server.bind("localhost", 0).sync(); @@ -115,7 +115,7 @@ public class EchoIT { */ EchoClientHandler() { firstMessage = BufferAllocator.heap().allocate(SIZE); - for (int i = 0; i < SIZE; i ++) { + for (int i = 0; i < SIZE; i++) { firstMessage.writeByte((byte) i); } } diff --git a/src/test/java/io/netty/buffer/api/examples/echo/EchoServer.java b/src/test/java/io/netty/buffer/api/examples/echo/EchoServer.java index 6f28e29..ee235d3 100644 --- a/src/test/java/io/netty/buffer/api/examples/echo/EchoServer.java +++ b/src/test/java/io/netty/buffer/api/examples/echo/EchoServer.java @@ -59,7 +59,7 @@ public final class EchoServer { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) - .option(ChannelOption.ALLOCATOR, allocator) + .childOption(ChannelOption.ALLOCATOR, allocator) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer() { diff --git a/src/test/java/io/netty/buffer/api/examples/echo/EchoServerHandler.java b/src/test/java/io/netty/buffer/api/examples/echo/EchoServerHandler.java index a3a896e..bc4306c 100644 --- a/src/test/java/io/netty/buffer/api/examples/echo/EchoServerHandler.java +++ b/src/test/java/io/netty/buffer/api/examples/echo/EchoServerHandler.java @@ -15,6 +15,9 @@ */ package io.netty.buffer.api.examples.echo; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.api.Buffer; +import io.netty.buffer.api.adaptor.ByteBufAdaptor; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; @@ -24,10 +27,13 @@ import io.netty.channel.ChannelHandlerContext; */ @Sharable public class EchoServerHandler implements ChannelHandler { - @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { - ctx.write(msg); + if (msg instanceof ByteBuf) { + // For this example, we only echo back buffers that are using the new buffer API. + Buffer buf = ByteBufAdaptor.extract((ByteBuf) msg); + ctx.write(buf); + } } @Override