Update docs and examples
This commit is contained in:
parent
1b65bf9a23
commit
78f04eeb49
125
README.adoc
Normal file
125
README.adoc
Normal file
@ -0,0 +1,125 @@
|
|||||||
|
= Netty Incubator Buffer API
|
||||||
|
|
||||||
|
This repository is incubating a new buffer API proposed for Netty 5.
|
||||||
|
|
||||||
|
== Building and Testing
|
||||||
|
|
||||||
|
Short version: just run `make`.
|
||||||
|
|
||||||
|
The project currently relies on snapshot versions of the https://github.com/openjdk/panama-foreign[Panama Foreign] fork of OpenJDK.
|
||||||
|
This allows us to test out the most recent version of the `jdk.incubator.foreign` APIs, but also make building, and local development more involved.
|
||||||
|
To simplify things, we have a Docker based build, controlled via a Makefile with the following commands:
|
||||||
|
|
||||||
|
* `image` – build the docker image.This includes building a snapshot of OpenJDK, and download all relevant Maven dependencies.
|
||||||
|
* `test` – run all tests in a docker container.This implies `image`.The container is automatically deleted afterwards.
|
||||||
|
* `dbg` – drop into a shell in the build container, without running the build itself.The debugging container is not deleted afterwards.
|
||||||
|
* `clean` – remove the leftover containers created by `dbg`, `test`, and `build`.
|
||||||
|
* `build` – build binaries and run all tests in a container, and copy the `target` directory out of the container afterwards.This is the default build target.
|
||||||
|
|
||||||
|
== Example: Echo Client and Server
|
||||||
|
|
||||||
|
Making use of this new buffer API on the client side is quite easy.
|
||||||
|
Even though Netty 5 does not have native support for these buffers, it is able to convert them to the old `ByteBuf` API as needed.
|
||||||
|
This means we are able to send incubator buffers through a Netty pipeline, and have it work as if we were sending `ByteBuf` instances.
|
||||||
|
|
||||||
|
[source,java]
|
||||||
|
----
|
||||||
|
public final class Client {
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
EventLoopGroup group = new MultithreadEventLoopGroup(NioHandler.newFactory());
|
||||||
|
try (BufferAllocator allocator = BufferAllocator.pooledDirect()) { // <1>
|
||||||
|
Bootstrap b = new Bootstrap();
|
||||||
|
b.group(group)
|
||||||
|
.channel(NioSocketChannel.class)
|
||||||
|
.option(ChannelOption.TCP_NODELAY, true)
|
||||||
|
.handler(new ChannelInitializer<SocketChannel>() {
|
||||||
|
@Override
|
||||||
|
public void initChannel(SocketChannel ch) throws Exception {
|
||||||
|
ch.pipeline().addLast(new ChannelHandlerAdapter() {
|
||||||
|
@Override
|
||||||
|
public void channelActive(ChannelHandlerContext ctx) {
|
||||||
|
Buffer message = allocator.allocate(256); // <2>
|
||||||
|
for (int i = 0; i < message.capacity(); i++) {
|
||||||
|
message.writeByte((byte) i);
|
||||||
|
}
|
||||||
|
ctx.writeAndFlush(message); // <3>
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Start the client.
|
||||||
|
ChannelFuture f = b.connect("127.0.0.1", 8007).sync();
|
||||||
|
|
||||||
|
// Wait until the connection is closed.
|
||||||
|
f.channel().closeFuture().sync();
|
||||||
|
} finally {
|
||||||
|
// Shut down the event loop to terminate all threads.
|
||||||
|
group.shutdownGracefully();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
----
|
||||||
|
<1> A life-cycled allocator is created to wrap the scope of our application.
|
||||||
|
<2> Buffers are allocated with one of the `allocate` methods.
|
||||||
|
<3> The buffer can then be sent down the pipeline, and will be written to the socket just like a `ByteBuf` would.
|
||||||
|
|
||||||
|
[NOTE]
|
||||||
|
--
|
||||||
|
The same is not the case for `BufferHolder`.
|
||||||
|
It is not treated the same as a `ByteBufHolder`.
|
||||||
|
--
|
||||||
|
|
||||||
|
On the server size, things are more complicated because Netty itself will be allocating the buffers, and the `ByteBufAllocator` API is only capable of returning `ByteBuf` instances.
|
||||||
|
The `ByteBufAllocatorAdaptor` will allocate `ByteBuf` instances that are backed by the new buffers.
|
||||||
|
The buffers can then we extracted from the `ByteBuf` instances with the `ByteBufAdaptor.extract` method.
|
||||||
|
|
||||||
|
We can tell a Netty server how to allocate buffers by setting the `ALLOCATOR` child-channel option:
|
||||||
|
|
||||||
|
[source,java]
|
||||||
|
----
|
||||||
|
ByteBufAllocatorAdaptor allocator = new ByteBufAllocatorAdaptor(); // <1>
|
||||||
|
ServerBootstrap server = new ServerBootstrap();
|
||||||
|
server.group(bossGroup, workerGroup)
|
||||||
|
.channel(NioServerSocketChannel.class)
|
||||||
|
.childOption(ChannelOption.ALLOCATOR, allocator) // <2>
|
||||||
|
.handler(new EchoServerHandler());
|
||||||
|
----
|
||||||
|
<1> The `ByteBufAllocatorAdaptor` implements `ByteBufAllocator`, and directly allocates `ByteBuf` instances that are backed by buffers that use the new API.
|
||||||
|
<2> To make Netty use a given allocator when allocating buffers for receiving data, we set the allocator as a child option.
|
||||||
|
|
||||||
|
With the above, we just changed how the buffers are allocated, but we haven't changed the API we use for interacting with the buffers.
|
||||||
|
The buffers are still allocated at `ByteBuf` instances, and flow through the pipeline as such.
|
||||||
|
If we want to use the new buffer API in our server handlers, we have to extract the buffers from the `ByteBuf` instances that are passed down:
|
||||||
|
|
||||||
|
[source,java]
|
||||||
|
----
|
||||||
|
import io.netty.buffer.ByteBuf;
|
||||||
|
import io.netty.buffer.api.Buffer;
|
||||||
|
import io.netty.buffer.api.adaptor.ByteBufAdaptor;
|
||||||
|
|
||||||
|
@Sharable
|
||||||
|
public class EchoServerHandler implements ChannelHandler {
|
||||||
|
@Override
|
||||||
|
public void channelRead(ChannelHandlerContext ctx, Object msg) { // <1>
|
||||||
|
if (msg instanceof ByteBuf) { // <2>
|
||||||
|
// For this example, we only echo back buffers that are using the new buffer API.
|
||||||
|
Buffer buf = ByteBufAdaptor.extract((ByteBuf) msg); // <3>
|
||||||
|
ctx.write(buf); // <4>
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void channelReadComplete(ChannelHandlerContext ctx) {
|
||||||
|
ctx.flush();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
----
|
||||||
|
<1> Netty pipelines are defined as transferring `Object` instances as messages.
|
||||||
|
<2> When we receive data directly from a socket, these messages will be `ByteBuf` instances with the received data.
|
||||||
|
<3> Since we set the allocator to create `ByteBuf` instances that are backed by buffers with the new API, we will be able to extract the backing `Buffer` instances.
|
||||||
|
<4> We can then operate on the extracted `Buffer` instances directly.
|
||||||
|
The `Buffer` and `ByteBuf` instances mirror each other exactly.
|
||||||
|
In this case, we just write them back to the client that sent the data to us.
|
||||||
|
|
||||||
|
The files in `src/test/java/io/netty/buffer/api/examples/echo` for the full source code to this example.
|
17
README.md
17
README.md
@ -1,17 +0,0 @@
|
|||||||
# Netty Incubator Buffer API
|
|
||||||
|
|
||||||
This repository is incubating a new buffer API proposed for Netty 5.
|
|
||||||
|
|
||||||
## Building and Testing
|
|
||||||
|
|
||||||
Short version: just run `make`.
|
|
||||||
|
|
||||||
The project currently relies on snapshot versions of the [Panama Foreign](https://github.com/openjdk/panama-foreign) fork of OpenJDK.
|
|
||||||
This allows us to test out the most recent version of the `jdk.incubator.foreign` APIs, but also make building, and local development more involved.
|
|
||||||
To simplify things, we have a Docker based build, controlled via a Makefile with the following commands:
|
|
||||||
|
|
||||||
* `image` – build the docker image. This includes building a snapshot of OpenJDK, and download all relevant Maven dependencies.
|
|
||||||
* `test` – run all tests in a docker container. This implies `image`. The container is automatically deleted afterwards.
|
|
||||||
* `dbg` – drop into a shell in the build container, without running the build itself. The debugging container is not deleted afterwards.
|
|
||||||
* `clean` – remove the leftover containers created by `dbg`, `test`, and `build`.
|
|
||||||
* `build` – build binaries and run all tests in a container, and copy the `target` directory out of the container afterwards. This is the default build target.
|
|
@ -33,7 +33,7 @@ import java.nio.channels.GatheringByteChannel;
|
|||||||
import java.nio.channels.ScatteringByteChannel;
|
import java.nio.channels.ScatteringByteChannel;
|
||||||
import java.nio.charset.Charset;
|
import java.nio.charset.Charset;
|
||||||
|
|
||||||
public class ByteBufAdaptor extends ByteBuf {
|
public final class ByteBufAdaptor extends ByteBuf {
|
||||||
private final ByteBufAllocatorAdaptor alloc;
|
private final ByteBufAllocatorAdaptor alloc;
|
||||||
private final Buffer buffer;
|
private final Buffer buffer;
|
||||||
|
|
||||||
@ -42,6 +42,23 @@ public class ByteBufAdaptor extends ByteBuf {
|
|||||||
this.buffer = buffer;
|
this.buffer = buffer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Extracts the underlying {@link Buffer} instance that is backing this {@link ByteBuf}, if any.
|
||||||
|
* This is similar to {@link #unwrap()} except the return type is a {@link Buffer}.
|
||||||
|
* If this {@link ByteBuf} does not wrap a {@link Buffer}, then {@code null} is returned.
|
||||||
|
*
|
||||||
|
* @param byteBuf The {@link ByteBuf} to extract the {@link Buffer} from.
|
||||||
|
* @return The {@link Buffer} instance that is backing the given {@link ByteBuf}, or {@code null} if the given
|
||||||
|
* {@link ByteBuf} is not backed by a {@link Buffer}.
|
||||||
|
*/
|
||||||
|
public static Buffer extract(ByteBuf byteBuf) {
|
||||||
|
if (byteBuf instanceof ByteBufAdaptor) {
|
||||||
|
ByteBufAdaptor bba = (ByteBufAdaptor) byteBuf;
|
||||||
|
return bba.buffer;
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int capacity() {
|
public int capacity() {
|
||||||
return buffer.capacity();
|
return buffer.capacity();
|
||||||
@ -1005,7 +1022,9 @@ public class ByteBufAdaptor extends ByteBuf {
|
|||||||
return true;
|
return true;
|
||||||
});
|
});
|
||||||
int read = (int) in.read(components);
|
int read = (int) in.read(components);
|
||||||
writerIndex(read + writerIndex());
|
if (read > 0) {
|
||||||
|
writerIndex(read + writerIndex());
|
||||||
|
}
|
||||||
return read;
|
return read;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1019,7 +1038,10 @@ public class ByteBufAdaptor extends ByteBuf {
|
|||||||
});
|
});
|
||||||
int read = 0;
|
int read = 0;
|
||||||
for (ByteBuffer component : components) {
|
for (ByteBuffer component : components) {
|
||||||
read += in.read(component, position + read);
|
int r = in.read(component, position + read);
|
||||||
|
if (r > 0) {
|
||||||
|
read += r;
|
||||||
|
}
|
||||||
if (component.hasRemaining()) {
|
if (component.hasRemaining()) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -98,7 +98,9 @@ public class ByteBufAllocatorAdaptor implements ByteBufAllocator, AutoCloseable
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ByteBuf directBuffer(int initialCapacity) {
|
public ByteBuf directBuffer(int initialCapacity) {
|
||||||
return new ByteBufAdaptor(this, offheap.allocate(initialCapacity));
|
// TODO we cannot use off-heap buffers here, until the JDK allows direct byte buffers based on native
|
||||||
|
// memory segments to be used in IO operations.
|
||||||
|
return new ByteBufAdaptor(this, onheap.allocate(initialCapacity));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -53,18 +53,18 @@ public class EchoIT {
|
|||||||
try {
|
try {
|
||||||
ServerBootstrap server = new ServerBootstrap();
|
ServerBootstrap server = new ServerBootstrap();
|
||||||
server.group(bossGroup, workerGroup)
|
server.group(bossGroup, workerGroup)
|
||||||
.channel(NioServerSocketChannel.class)
|
.channel(NioServerSocketChannel.class)
|
||||||
.option(ChannelOption.ALLOCATOR, allocator)
|
.childOption(ChannelOption.ALLOCATOR, allocator)
|
||||||
.option(ChannelOption.SO_BACKLOG, 100)
|
.option(ChannelOption.SO_BACKLOG, 100)
|
||||||
.handler(new LoggingHandler(LogLevel.INFO))
|
.handler(new LoggingHandler(LogLevel.INFO))
|
||||||
.childHandler(new ChannelInitializer<SocketChannel>() {
|
.childHandler(new ChannelInitializer<SocketChannel>() {
|
||||||
@Override
|
@Override
|
||||||
public void initChannel(SocketChannel ch) throws Exception {
|
public void initChannel(SocketChannel ch) throws Exception {
|
||||||
ChannelPipeline p = ch.pipeline();
|
ChannelPipeline p = ch.pipeline();
|
||||||
p.addLast(new LoggingHandler(LogLevel.INFO));
|
p.addLast(new LoggingHandler(LogLevel.INFO));
|
||||||
p.addLast(serverHandler);
|
p.addLast(serverHandler);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
// Start the server.
|
// Start the server.
|
||||||
ChannelFuture bind = server.bind("localhost", 0).sync();
|
ChannelFuture bind = server.bind("localhost", 0).sync();
|
||||||
@ -115,7 +115,7 @@ public class EchoIT {
|
|||||||
*/
|
*/
|
||||||
EchoClientHandler() {
|
EchoClientHandler() {
|
||||||
firstMessage = BufferAllocator.heap().allocate(SIZE);
|
firstMessage = BufferAllocator.heap().allocate(SIZE);
|
||||||
for (int i = 0; i < SIZE; i ++) {
|
for (int i = 0; i < SIZE; i++) {
|
||||||
firstMessage.writeByte((byte) i);
|
firstMessage.writeByte((byte) i);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -59,7 +59,7 @@ public final class EchoServer {
|
|||||||
ServerBootstrap b = new ServerBootstrap();
|
ServerBootstrap b = new ServerBootstrap();
|
||||||
b.group(bossGroup, workerGroup)
|
b.group(bossGroup, workerGroup)
|
||||||
.channel(NioServerSocketChannel.class)
|
.channel(NioServerSocketChannel.class)
|
||||||
.option(ChannelOption.ALLOCATOR, allocator)
|
.childOption(ChannelOption.ALLOCATOR, allocator)
|
||||||
.option(ChannelOption.SO_BACKLOG, 100)
|
.option(ChannelOption.SO_BACKLOG, 100)
|
||||||
.handler(new LoggingHandler(LogLevel.INFO))
|
.handler(new LoggingHandler(LogLevel.INFO))
|
||||||
.childHandler(new ChannelInitializer<SocketChannel>() {
|
.childHandler(new ChannelInitializer<SocketChannel>() {
|
||||||
|
@ -15,6 +15,9 @@
|
|||||||
*/
|
*/
|
||||||
package io.netty.buffer.api.examples.echo;
|
package io.netty.buffer.api.examples.echo;
|
||||||
|
|
||||||
|
import io.netty.buffer.ByteBuf;
|
||||||
|
import io.netty.buffer.api.Buffer;
|
||||||
|
import io.netty.buffer.api.adaptor.ByteBufAdaptor;
|
||||||
import io.netty.channel.ChannelHandler;
|
import io.netty.channel.ChannelHandler;
|
||||||
import io.netty.channel.ChannelHandler.Sharable;
|
import io.netty.channel.ChannelHandler.Sharable;
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
@ -24,10 +27,13 @@ import io.netty.channel.ChannelHandlerContext;
|
|||||||
*/
|
*/
|
||||||
@Sharable
|
@Sharable
|
||||||
public class EchoServerHandler implements ChannelHandler {
|
public class EchoServerHandler implements ChannelHandler {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void channelRead(ChannelHandlerContext ctx, Object msg) {
|
public void channelRead(ChannelHandlerContext ctx, Object msg) {
|
||||||
ctx.write(msg);
|
if (msg instanceof ByteBuf) {
|
||||||
|
// For this example, we only echo back buffers that are using the new buffer API.
|
||||||
|
Buffer buf = ByteBufAdaptor.extract((ByteBuf) msg);
|
||||||
|
ctx.write(buf);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
Loading…
Reference in New Issue
Block a user