diff --git a/README.adoc b/README.adoc new file mode 100644 index 0000000..b947f9d --- /dev/null +++ b/README.adoc @@ -0,0 +1,125 @@ += Netty Incubator Buffer API + +This repository is incubating a new buffer API proposed for Netty 5. + +== Building and Testing + +Short version: just run `make`. + +The project currently relies on snapshot versions of the https://github.com/openjdk/panama-foreign[Panama Foreign] fork of OpenJDK. +This allows us to test out the most recent version of the `jdk.incubator.foreign` APIs, but also make building, and local development more involved. +To simplify things, we have a Docker based build, controlled via a Makefile with the following commands: + +* `image` – build the docker image.This includes building a snapshot of OpenJDK, and download all relevant Maven dependencies. +* `test` – run all tests in a docker container.This implies `image`.The container is automatically deleted afterwards. +* `dbg` – drop into a shell in the build container, without running the build itself.The debugging container is not deleted afterwards. +* `clean` – remove the leftover containers created by `dbg`, `test`, and `build`. +* `build` – build binaries and run all tests in a container, and copy the `target` directory out of the container afterwards.This is the default build target. + +== Example: Echo Client and Server + +Making use of this new buffer API on the client side is quite easy. +Even though Netty 5 does not have native support for these buffers, it is able to convert them to the old `ByteBuf` API as needed. +This means we are able to send incubator buffers through a Netty pipeline, and have it work as if we were sending `ByteBuf` instances. + +[source,java] +---- +public final class Client { + public static void main(String[] args) throws Exception { + EventLoopGroup group = new MultithreadEventLoopGroup(NioHandler.newFactory()); + try (BufferAllocator allocator = BufferAllocator.pooledDirect()) { // <1> + Bootstrap b = new Bootstrap(); + b.group(group) + .channel(NioSocketChannel.class) + .option(ChannelOption.TCP_NODELAY, true) + .handler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast(new ChannelHandlerAdapter() { + @Override + public void channelActive(ChannelHandlerContext ctx) { + Buffer message = allocator.allocate(256); // <2> + for (int i = 0; i < message.capacity(); i++) { + message.writeByte((byte) i); + } + ctx.writeAndFlush(message); // <3> + } + }); + } + }); + + // Start the client. + ChannelFuture f = b.connect("127.0.0.1", 8007).sync(); + + // Wait until the connection is closed. + f.channel().closeFuture().sync(); + } finally { + // Shut down the event loop to terminate all threads. + group.shutdownGracefully(); + } + } +} +---- +<1> A life-cycled allocator is created to wrap the scope of our application. +<2> Buffers are allocated with one of the `allocate` methods. +<3> The buffer can then be sent down the pipeline, and will be written to the socket just like a `ByteBuf` would. + +[NOTE] +-- +The same is not the case for `BufferHolder`. +It is not treated the same as a `ByteBufHolder`. +-- + +On the server size, things are more complicated because Netty itself will be allocating the buffers, and the `ByteBufAllocator` API is only capable of returning `ByteBuf` instances. +The `ByteBufAllocatorAdaptor` will allocate `ByteBuf` instances that are backed by the new buffers. +The buffers can then we extracted from the `ByteBuf` instances with the `ByteBufAdaptor.extract` method. + +We can tell a Netty server how to allocate buffers by setting the `ALLOCATOR` child-channel option: + +[source,java] +---- +ByteBufAllocatorAdaptor allocator = new ByteBufAllocatorAdaptor(); // <1> +ServerBootstrap server = new ServerBootstrap(); +server.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .childOption(ChannelOption.ALLOCATOR, allocator) // <2> + .handler(new EchoServerHandler()); +---- +<1> The `ByteBufAllocatorAdaptor` implements `ByteBufAllocator`, and directly allocates `ByteBuf` instances that are backed by buffers that use the new API. +<2> To make Netty use a given allocator when allocating buffers for receiving data, we set the allocator as a child option. + +With the above, we just changed how the buffers are allocated, but we haven't changed the API we use for interacting with the buffers. +The buffers are still allocated at `ByteBuf` instances, and flow through the pipeline as such. +If we want to use the new buffer API in our server handlers, we have to extract the buffers from the `ByteBuf` instances that are passed down: + +[source,java] +---- +import io.netty.buffer.ByteBuf; +import io.netty.buffer.api.Buffer; +import io.netty.buffer.api.adaptor.ByteBufAdaptor; + +@Sharable +public class EchoServerHandler implements ChannelHandler { + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { // <1> + if (msg instanceof ByteBuf) { // <2> + // For this example, we only echo back buffers that are using the new buffer API. + Buffer buf = ByteBufAdaptor.extract((ByteBuf) msg); // <3> + ctx.write(buf); // <4> + } + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) { + ctx.flush(); + } +} +---- +<1> Netty pipelines are defined as transferring `Object` instances as messages. +<2> When we receive data directly from a socket, these messages will be `ByteBuf` instances with the received data. +<3> Since we set the allocator to create `ByteBuf` instances that are backed by buffers with the new API, we will be able to extract the backing `Buffer` instances. +<4> We can then operate on the extracted `Buffer` instances directly. +The `Buffer` and `ByteBuf` instances mirror each other exactly. +In this case, we just write them back to the client that sent the data to us. + +The files in `src/test/java/io/netty/buffer/api/examples/echo` for the full source code to this example. \ No newline at end of file diff --git a/README.md b/README.md deleted file mode 100644 index 45acec6..0000000 --- a/README.md +++ /dev/null @@ -1,17 +0,0 @@ -# Netty Incubator Buffer API - -This repository is incubating a new buffer API proposed for Netty 5. - -## Building and Testing - -Short version: just run `make`. - -The project currently relies on snapshot versions of the [Panama Foreign](https://github.com/openjdk/panama-foreign) fork of OpenJDK. -This allows us to test out the most recent version of the `jdk.incubator.foreign` APIs, but also make building, and local development more involved. -To simplify things, we have a Docker based build, controlled via a Makefile with the following commands: - -* `image` – build the docker image. This includes building a snapshot of OpenJDK, and download all relevant Maven dependencies. -* `test` – run all tests in a docker container. This implies `image`. The container is automatically deleted afterwards. -* `dbg` – drop into a shell in the build container, without running the build itself. The debugging container is not deleted afterwards. -* `clean` – remove the leftover containers created by `dbg`, `test`, and `build`. -* `build` – build binaries and run all tests in a container, and copy the `target` directory out of the container afterwards. This is the default build target. diff --git a/pom.xml b/pom.xml index a3321b0..49b74f2 100644 --- a/pom.xml +++ b/pom.xml @@ -394,6 +394,12 @@ ${netty.build.version} test + + io.netty + netty-handler + ${netty.version} + test + org.openjdk.jmh jmh-core diff --git a/src/main/java/io/netty/buffer/api/BufferHolder.java b/src/main/java/io/netty/buffer/api/BufferHolder.java index 13f5c01..fd01d31 100644 --- a/src/main/java/io/netty/buffer/api/BufferHolder.java +++ b/src/main/java/io/netty/buffer/api/BufferHolder.java @@ -190,4 +190,9 @@ public abstract class BufferHolder> implements Rc { protected final Buffer getBufVolatile() { return (Buffer) BUF.getVolatile(this); } + + @Override + public boolean isAccessible() { + return buf.isAccessible(); + } } diff --git a/src/main/java/io/netty/buffer/api/Rc.java b/src/main/java/io/netty/buffer/api/Rc.java index 2cc50f7..beec593 100644 --- a/src/main/java/io/netty/buffer/api/Rc.java +++ b/src/main/java/io/netty/buffer/api/Rc.java @@ -87,4 +87,13 @@ public interface Rc> extends AutoCloseable, Deref { * @return The number of borrows, if any, of this object. */ int countBorrows(); + + /** + * Check if this object is accessible. + * + * @return {@code true} if this object is still valid and can be accessed, + * otherwise {@code false} if, for instance, this object has been dropped/deallocated, + * or been {@linkplain #send() sent} elsewhere. + */ + boolean isAccessible(); } diff --git a/src/main/java/io/netty/buffer/api/RcSupport.java b/src/main/java/io/netty/buffer/api/RcSupport.java index 1541810..2d1003f 100644 --- a/src/main/java/io/netty/buffer/api/RcSupport.java +++ b/src/main/java/io/netty/buffer/api/RcSupport.java @@ -102,6 +102,11 @@ public abstract class RcSupport, T extends RcSupport> impl return Math.max(acquires, 0); } + @Override + public boolean isAccessible() { + return acquires >= 0; + } + /** * Prepare this instance for ownsership transfer. This method is called from {@link #send()} in the sending thread. * This method should put this Rc in a deactivated state where it is no longer accessible from the currently owning diff --git a/src/main/java/io/netty/buffer/api/adaptor/BufferIntegratable.java b/src/main/java/io/netty/buffer/api/adaptor/BufferIntegratable.java new file mode 100644 index 0000000..fe9a73d --- /dev/null +++ b/src/main/java/io/netty/buffer/api/adaptor/BufferIntegratable.java @@ -0,0 +1,25 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer.api.adaptor; + +import io.netty.buffer.ByteBufConvertible; +import io.netty.util.ReferenceCounted; + +/** + * Interfaces that are required for an object to stand-in for a {@link io.netty.buffer.ByteBuf} in Netty. + */ +public interface BufferIntegratable extends ByteBufConvertible, ReferenceCounted { +} diff --git a/src/main/java/io/netty/buffer/api/adaptor/ByteBufAdaptor.java b/src/main/java/io/netty/buffer/api/adaptor/ByteBufAdaptor.java new file mode 100644 index 0000000..20a2243 --- /dev/null +++ b/src/main/java/io/netty/buffer/api/adaptor/ByteBufAdaptor.java @@ -0,0 +1,1320 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer.api.adaptor; + +import io.netty.buffer.ByteBufConvertible; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.Unpooled; +import io.netty.buffer.api.Buffer; +import io.netty.buffer.api.BufferAllocator; +import io.netty.util.ByteProcessor; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.FileChannel; +import java.nio.channels.GatheringByteChannel; +import java.nio.channels.ScatteringByteChannel; +import java.nio.charset.Charset; + +public final class ByteBufAdaptor extends ByteBuf { + private final ByteBufAllocatorAdaptor alloc; + private final Buffer buffer; + + public ByteBufAdaptor(ByteBufAllocatorAdaptor alloc, Buffer buffer) { + this.alloc = alloc; + this.buffer = buffer; + } + + /** + * Extracts the underlying {@link Buffer} instance that is backing this {@link ByteBuf}, if any. + * This is similar to {@link #unwrap()} except the return type is a {@link Buffer}. + * If this {@link ByteBuf} does not wrap a {@link Buffer}, then {@code null} is returned. + * + * @param byteBuf The {@link ByteBuf} to extract the {@link Buffer} from. + * @return The {@link Buffer} instance that is backing the given {@link ByteBuf}, or {@code null} if the given + * {@link ByteBuf} is not backed by a {@link Buffer}. + */ + public static Buffer extract(ByteBuf byteBuf) { + if (byteBuf instanceof ByteBufAdaptor) { + ByteBufAdaptor bba = (ByteBufAdaptor) byteBuf; + return bba.buffer; + } + return null; + } + + @Override + public int capacity() { + return buffer.capacity(); + } + + @Override + public ByteBuf capacity(int newCapacity) { + int diff = newCapacity - capacity() - buffer.writableBytes(); + if (diff > 0) { + buffer.ensureWritable(diff); + } + return this; + } + + @Override + public int maxCapacity() { + return capacity(); + } + + @Override + public ByteBufAllocator alloc() { + return alloc; + } + + @Override + public ByteOrder order() { + return buffer.order(); + } + + @Override + public ByteBuf order(ByteOrder endianness) { + buffer.order(endianness); + return this; + } + + @Override + public ByteBuf unwrap() { + return null; + } + + @Override + public boolean isDirect() { + return buffer.nativeAddress() != 0; + } + + @Override + public boolean isReadOnly() { + return buffer.readOnly(); + } + + @Override + public ByteBuf asReadOnly() { + return Unpooled.unreleasableBuffer(this); + } + + @Override + public int readerIndex() { + return buffer.readerOffset(); + } + + @Override + public ByteBuf readerIndex(int readerIndex) { + buffer.readerOffset(readerIndex); + return this; + } + + @Override + public int writerIndex() { + return buffer.writerOffset(); + } + + @Override + public ByteBuf writerIndex(int writerIndex) { + buffer.writerOffset(writerIndex); + return this; + } + + @Override + public ByteBuf setIndex(int readerIndex, int writerIndex) { + return readerIndex(readerIndex).writerIndex(writerIndex); + } + + @Override + public int readableBytes() { + return buffer.readableBytes(); + } + + @Override + public int writableBytes() { + return buffer.writableBytes(); + } + + @Override + public int maxWritableBytes() { + return writableBytes(); + } + + @Override + public boolean isReadable() { + return readableBytes() > 0; + } + + @Override + public boolean isReadable(int size) { + return readableBytes() >= size; + } + + @Override + public boolean isWritable() { + return writableBytes() > 0; + } + + @Override + public boolean isWritable(int size) { + return writableBytes() >= size; + } + + @Override + public ByteBuf clear() { + return setIndex(0, 0); + } + + @Override + public ByteBuf discardReadBytes() { + buffer.compact(); + return this; + } + + @Override + public ByteBuf discardSomeReadBytes() { + return discardReadBytes(); + } + + @Override + public ByteBuf ensureWritable(int minWritableBytes) { + buffer.ensureWritable(minWritableBytes); + return this; + } + + @Override + public int ensureWritable(int minWritableBytes, boolean force) { + buffer.ensureWritable(minWritableBytes); + return minWritableBytes; + } + + @Override + public boolean getBoolean(int index) { + return getByte(index) != 0; + } + + @Override + public byte getByte(int index) { + return buffer.getByte(index); + } + + @Override + public short getUnsignedByte(int index) { + return (short) buffer.getUnsignedByte(index); + } + + @Override + public short getShort(int index) { + return buffer.getShort(index); + } + + @Override + public short getShortLE(int index) { + ByteOrder originalOrder = buffer.order(); + try { + return buffer.order(ByteOrder.LITTLE_ENDIAN).getShort(index); + } finally { + buffer.order(originalOrder); + } + } + + @Override + public int getUnsignedShort(int index) { + return buffer.getUnsignedShort(index); + } + + @Override + public int getUnsignedShortLE(int index) { + ByteOrder originalOrder = buffer.order(); + try { + return buffer.order(ByteOrder.LITTLE_ENDIAN).getUnsignedShort(index); + } finally { + buffer.order(originalOrder); + } + } + + @Override + public int getMedium(int index) { + return buffer.getMedium(index); + } + + @Override + public int getMediumLE(int index) { + ByteOrder originalOrder = buffer.order(); + try { + return buffer.order(ByteOrder.LITTLE_ENDIAN).getMedium(index); + } finally { + buffer.order(originalOrder); + } + } + + @Override + public int getUnsignedMedium(int index) { + return buffer.getUnsignedMedium(index); + } + + @Override + public int getUnsignedMediumLE(int index) { + ByteOrder originalOrder = buffer.order(); + try { + return buffer.order(ByteOrder.LITTLE_ENDIAN).getUnsignedMedium(index); + } finally { + buffer.order(originalOrder); + } + } + + @Override + public int getInt(int index) { + return buffer.getInt(index); + } + + @Override + public int getIntLE(int index) { + ByteOrder originalOrder = buffer.order(); + try { + return buffer.order(ByteOrder.LITTLE_ENDIAN).getInt(index); + } finally { + buffer.order(originalOrder); + } + } + + @Override + public long getUnsignedInt(int index) { + return buffer.getUnsignedInt(index); + } + + @Override + public long getUnsignedIntLE(int index) { + ByteOrder originalOrder = buffer.order(); + try { + return buffer.order(ByteOrder.LITTLE_ENDIAN).getUnsignedInt(index); + } finally { + buffer.order(originalOrder); + } + } + + @Override + public long getLong(int index) { + return buffer.getLong(index); + } + + @Override + public long getLongLE(int index) { + ByteOrder originalOrder = buffer.order(); + try { + return buffer.order(ByteOrder.LITTLE_ENDIAN).getLong(index); + } finally { + buffer.order(originalOrder); + } + } + + @Override + public char getChar(int index) { + return buffer.getChar(index); + } + + @Override + public float getFloat(int index) { + return buffer.getFloat(index); + } + + @Override + public double getDouble(int index) { + return buffer.getDouble(index); + } + + @Override + public ByteBuf getBytes(int index, ByteBuf dst) { + while (dst.isWritable()) { + dst.writeByte(getByte(index++)); + } + return this; + } + + @Override + public ByteBuf getBytes(int index, ByteBuf dst, int length) { + for (int i = 0; i < length; i++) { + dst.writeByte(getByte(index + i)); + } + return this; + } + + @Override + public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) { + for (int i = 0; i < length; i++) { + dst.setByte(dstIndex + i, getByte(index + i)); + } + return this; + } + + @Override + public ByteBuf getBytes(int index, byte[] dst) { + return getBytes(index, dst, 0, dst.length); + } + + @Override + public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) { + for (int i = 0; i < length; i++) { + dst[dstIndex + i] = getByte(index + i); + } + return this; + } + + @Override + public ByteBuf getBytes(int index, ByteBuffer dst) { + while (dst.hasRemaining()) { + dst.put(getByte(index)); + index++; + } + return this; + } + + @Override + public ByteBuf getBytes(int index, OutputStream out, int length) throws IOException { + for (int i = 0; i < length; i++) { + out.write(getByte(index + i)); + } + return this; + } + + @Override + public int getBytes(int index, GatheringByteChannel out, int length) throws IOException { + ByteBuffer transfer = ByteBuffer.allocate(length); + buffer.copyInto(index, transfer, 0, length); + return out.write(transfer); + } + + @Override + public int getBytes(int index, FileChannel out, long position, int length) throws IOException { + ByteBuffer transfer = ByteBuffer.allocate(length); + buffer.copyInto(index, transfer, 0, length); + return out.write(transfer, position); + } + + @Override + public CharSequence getCharSequence(int index, int length, Charset charset) { + byte[] bytes = new byte[length]; + getBytes(index, bytes); + return new String(bytes, charset); + } + + @Override + public ByteBuf setBoolean(int index, boolean value) { + return setByte(index, value? 1 : 0); + } + + @Override + public ByteBuf setByte(int index, int value) { + buffer.setByte(index, (byte) value); + return this; + } + + @Override + public ByteBuf setShort(int index, int value) { + buffer.setShort(index, (short) value); + return this; + } + + @Override + public ByteBuf setShortLE(int index, int value) { + ByteOrder originalOrder = buffer.order(); + try { + buffer.order(ByteOrder.LITTLE_ENDIAN).setShort(index, (short) value); + return this; + } finally { + buffer.order(originalOrder); + } + } + + @Override + public ByteBuf setMedium(int index, int value) { + buffer.setMedium(index, value); + return this; + } + + @Override + public ByteBuf setMediumLE(int index, int value) { + ByteOrder originalOrder = buffer.order(); + try { + buffer.order(ByteOrder.LITTLE_ENDIAN).setMedium(index, value); + return this; + } finally { + buffer.order(originalOrder); + } + } + + @Override + public ByteBuf setInt(int index, int value) { + buffer.setInt(index, value); + return this; + } + + @Override + public ByteBuf setIntLE(int index, int value) { + ByteOrder originalOrder = buffer.order(); + try { + buffer.order(ByteOrder.LITTLE_ENDIAN).setInt(index, value); + return this; + } finally { + buffer.order(originalOrder); + } + } + + @Override + public ByteBuf setLong(int index, long value) { + buffer.setLong(index, value); + return this; + } + + @Override + public ByteBuf setLongLE(int index, long value) { + ByteOrder originalOrder = buffer.order(); + try { + buffer.order(ByteOrder.LITTLE_ENDIAN).setLong(index, value); + return this; + } finally { + buffer.order(originalOrder); + } + } + + @Override + public ByteBuf setChar(int index, int value) { + buffer.setChar(index, (char) value); + return this; + } + + @Override + public ByteBuf setFloat(int index, float value) { + buffer.setFloat(index, value); + return this; + } + + @Override + public ByteBuf setDouble(int index, double value) { + buffer.setDouble(index, value); + return this; + } + + @Override + public ByteBuf setBytes(int index, ByteBuf src) { + while (src.isReadable() && index < capacity()) { + setByte(index++, src.readByte()); + } + return this; + } + + @Override + public ByteBuf setBytes(int index, ByteBuf src, int length) { + for (int i = 0; i < length; i++) { + setByte(index + i, src.readByte()); + } + return this; + } + + @Override + public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) { + for (int i = 0; i < length; i++) { + setByte(index + i, src.getByte(srcIndex + i)); + } + return this; + } + + @Override + public ByteBuf setBytes(int index, byte[] src) { + return setBytes(index, src, 0, src.length); + } + + @Override + public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) { + for (int i = 0; i < length; i++) { + setByte(index + i, src[srcIndex + i]); + } + return this; + } + + @Override + public ByteBuf setBytes(int index, ByteBuffer src) { + while (src.hasRemaining()) { + setByte(index, src.get()); + index++; + } + return this; + } + + @Override + public int setBytes(int index, InputStream in, int length) throws IOException { + byte[] bytes = in.readNBytes(length); + setBytes(index, bytes, 0, length); + return bytes.length; + } + + @Override + public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException { + ByteBuffer transfer = ByteBuffer.allocate(length); + int bytes = in.read(transfer); + transfer.flip(); + setBytes(index, transfer); + return bytes; + } + + @Override + public int setBytes(int index, FileChannel in, long position, int length) throws IOException { + ByteBuffer transfer = ByteBuffer.allocate(length); + int bytes = in.read(transfer, position); + transfer.flip(); + setBytes(index, transfer); + return bytes; + } + + @Override + public ByteBuf setZero(int index, int length) { + for (int i = 0; i < length; i++) { + setByte(index + i, 0); + } + return this; + } + + @Override + public int setCharSequence(int index, CharSequence sequence, Charset charset) { + byte[] bytes = sequence.toString().getBytes(charset); + for (int i = 0; i < bytes.length; i++) { + setByte(index + i, bytes[i]); + } + return bytes.length; + } + + @Override + public boolean readBoolean() { + return readByte() != 0; + } + + @Override + public byte readByte() { + return buffer.readByte(); + } + + @Override + public short readUnsignedByte() { + return (short) buffer.readUnsignedByte(); + } + + @Override + public short readShort() { + return buffer.readShort(); + } + + @Override + public short readShortLE() { + ByteOrder originalOrder = buffer.order(); + try { + return buffer.order(ByteOrder.LITTLE_ENDIAN).readShort(); + } finally { + buffer.order(originalOrder); + } + } + + @Override + public int readUnsignedShort() { + return buffer.readUnsignedShort(); + } + + @Override + public int readUnsignedShortLE() { + ByteOrder originalOrder = buffer.order(); + try { + return buffer.order(ByteOrder.LITTLE_ENDIAN).readUnsignedShort(); + } finally { + buffer.order(originalOrder); + } + } + + @Override + public int readMedium() { + return buffer.readMedium(); + } + + @Override + public int readMediumLE() { + ByteOrder originalOrder = buffer.order(); + try { + return buffer.order(ByteOrder.LITTLE_ENDIAN).readMedium(); + } finally { + buffer.order(originalOrder); + } + } + + @Override + public int readUnsignedMedium() { + return buffer.readUnsignedMedium(); + } + + @Override + public int readUnsignedMediumLE() { + ByteOrder originalOrder = buffer.order(); + try { + return buffer.order(ByteOrder.LITTLE_ENDIAN).readUnsignedMedium(); + } finally { + buffer.order(originalOrder); + } + } + + @Override + public int readInt() { + return buffer.readInt(); + } + + @Override + public int readIntLE() { + ByteOrder originalOrder = buffer.order(); + try { + return buffer.order(ByteOrder.LITTLE_ENDIAN).readInt(); + } finally { + buffer.order(originalOrder); + } + } + + @Override + public long readUnsignedInt() { + return buffer.readUnsignedInt(); + } + + @Override + public long readUnsignedIntLE() { + ByteOrder originalOrder = buffer.order(); + try { + return buffer.order(ByteOrder.LITTLE_ENDIAN).readUnsignedInt(); + } finally { + buffer.order(originalOrder); + } + } + + @Override + public long readLong() { + return buffer.readLong(); + } + + @Override + public long readLongLE() { + ByteOrder originalOrder = buffer.order(); + try { + return buffer.order(ByteOrder.LITTLE_ENDIAN).readLong(); + } finally { + buffer.order(originalOrder); + } + } + + @Override + public char readChar() { + return buffer.readChar(); + } + + @Override + public float readFloat() { + return buffer.readFloat(); + } + + @Override + public double readDouble() { + return buffer.readDouble(); + } + + @Override + public ByteBuf readBytes(int length) { + Buffer copy = preferredBufferAllocator().allocate(length); + buffer.copyInto(0, copy, 0, length); + return wrap(copy); + } + + @Override + public ByteBuf readSlice(int length) { + return readRetainedSlice(length); + } + + @Override + public ByteBuf readRetainedSlice(int length) { + return slice(readerIndex(), length); + } + + @Override + public ByteBuf readBytes(ByteBuf dst) { + while (dst.isWritable()) { + dst.writeByte(readByte()); + } + return this; + } + + @Override + public ByteBuf readBytes(ByteBuf dst, int length) { + for (int i = 0; i < length; i++) { + dst.writeByte(readByte()); + } + return this; + } + + @Override + public ByteBuf readBytes(ByteBuf dst, int dstIndex, int length) { + for (int i = 0; i < length; i++) { + dst.setByte(dstIndex + i, readByte()); + } + return this; + } + + @Override + public ByteBuf readBytes(byte[] dst) { + return readBytes(dst, 0, dst.length); + } + + @Override + public ByteBuf readBytes(byte[] dst, int dstIndex, int length) { + for (int i = 0; i < length; i++) { + dst[dstIndex + i] = readByte(); + } + return this; + } + + @Override + public ByteBuf readBytes(ByteBuffer dst) { + while (dst.hasRemaining()) { + dst.put(readByte()); + } + return this; + } + + @Override + public ByteBuf readBytes(OutputStream out, int length) throws IOException { + for (int i = 0; i < length; i++) { + out.write(readByte()); + } + return this; + } + + @Override + public int readBytes(GatheringByteChannel out, int length) throws IOException { + ByteBuffer[] components = new ByteBuffer[buffer.countReadableComponents()]; + buffer.forEachReadable(0, (i, component) -> { + components[i] = component.readableBuffer(); + return true; + }); + int written = (int) out.write(components); + skipBytes(written); + return written; + } + + @Override + public CharSequence readCharSequence(int length, Charset charset) { + byte[] bytes = new byte[length]; + readBytes(bytes); + return new String(bytes, charset); + } + + @Override + public int readBytes(FileChannel out, long position, int length) throws IOException { + ByteBuffer[] components = new ByteBuffer[buffer.countReadableComponents()]; + buffer.forEachReadable(0, (i, component) -> { + components[i] = component.readableBuffer(); + return true; + }); + int written = 0; + for (ByteBuffer component : components) { + written += out.write(component, position + written); + if (component.hasRemaining()) { + break; + } + } + skipBytes(written); + return written; + } + + @Override + public ByteBuf skipBytes(int length) { + buffer.readerOffset(length + buffer.readerOffset()); + return this; + } + + @Override + public ByteBuf writeBoolean(boolean value) { + return writeByte(value? 1 : 0); + } + + @Override + public ByteBuf writeByte(int value) { + ensureWritable(1); + buffer.writeByte((byte) value); + return this; + } + + @Override + public ByteBuf writeShort(int value) { + ensureWritable(2); + buffer.writeShort((short) value); + return this; + } + + @Override + public ByteBuf writeShortLE(int value) { + ensureWritable(2); + ByteOrder originalOrder = buffer.order(); + try { + buffer.order(ByteOrder.LITTLE_ENDIAN).writeShort((short) value); + return this; + } finally { + buffer.order(originalOrder); + } + } + + @Override + public ByteBuf writeMedium(int value) { + ensureWritable(3); + buffer.writeMedium(value); + return this; + } + + @Override + public ByteBuf writeMediumLE(int value) { + ensureWritable(3); + ByteOrder originalOrder = buffer.order(); + try { + buffer.order(ByteOrder.LITTLE_ENDIAN).writeMedium(value); + return this; + } finally { + buffer.order(originalOrder); + } + } + + @Override + public ByteBuf writeInt(int value) { + ensureWritable(4); + buffer.writeInt(value); + return this; + } + + @Override + public ByteBuf writeIntLE(int value) { + ensureWritable(4); + ByteOrder originalOrder = buffer.order(); + try { + buffer.order(ByteOrder.LITTLE_ENDIAN).writeInt(value); + return this; + } finally { + buffer.order(originalOrder); + } + } + + @Override + public ByteBuf writeLong(long value) { + ensureWritable(8); + buffer.writeLong(value); + return this; + } + + @Override + public ByteBuf writeLongLE(long value) { + ensureWritable(8); + ByteOrder originalOrder = buffer.order(); + try { + buffer.order(ByteOrder.LITTLE_ENDIAN).writeLong(value); + return this; + } finally { + buffer.order(originalOrder); + } + } + + @Override + public ByteBuf writeChar(int value) { + ensureWritable(2); + buffer.writeChar((char) value); + return this; + } + + @Override + public ByteBuf writeFloat(float value) { + ensureWritable(4); + buffer.writeFloat(value); + return this; + } + + @Override + public ByteBuf writeDouble(double value) { + ensureWritable(8); + buffer.writeDouble(value); + return this; + } + + @Override + public ByteBuf writeBytes(ByteBuf src) { + return writeBytes(src, src.readableBytes()); + } + + @Override + public ByteBuf writeBytes(ByteBuf src, int length) { + ensureWritable(length); + for (int i = 0; i < length; i++) { + writeByte(src.readByte()); + } + return this; + } + + @Override + public ByteBuf writeBytes(ByteBuf src, int srcIndex, int length) { + ensureWritable(length); + for (int i = 0; i < length; i++) { + writeByte(src.getByte(srcIndex + i)); + } + return this; + } + + @Override + public ByteBuf writeBytes(byte[] src) { + ensureWritable(src.length); + for (byte b : src) { + writeByte(b); + } + return this; + } + + @Override + public ByteBuf writeBytes(byte[] src, int srcIndex, int length) { + ensureWritable(length); + for (int i = 0; i < length; i++) { + writeByte(src[srcIndex + i]); + } + return this; + } + + @Override + public ByteBuf writeBytes(ByteBuffer src) { + ensureWritable(src.remaining()); + while (src.hasRemaining()) { + writeByte(src.get()); + } + return this; + } + + @Override + public int writeBytes(InputStream in, int length) throws IOException { + byte[] bytes = in.readNBytes(length); + writeBytes(bytes); + return bytes.length; + } + + @Override + public int writeBytes(ScatteringByteChannel in, int length) throws IOException { + ensureWritable(length); + ByteBuffer[] components = new ByteBuffer[buffer.countWritableComponents()]; + buffer.forEachWritable(0, (i, component) -> { + components[i] = component.writableBuffer(); + return true; + }); + int read = (int) in.read(components); + if (read > 0) { + writerIndex(read + writerIndex()); + } + return read; + } + + @Override + public int writeBytes(FileChannel in, long position, int length) throws IOException { + ensureWritable(length); + ByteBuffer[] components = new ByteBuffer[buffer.countWritableComponents()]; + buffer.forEachWritable(0, (i, component) -> { + components[i] = component.writableBuffer(); + return true; + }); + int read = 0; + for (ByteBuffer component : components) { + int r = in.read(component, position + read); + if (r > 0) { + read += r; + } + if (component.hasRemaining()) { + break; + } + } + writerIndex(read + writerIndex()); + return read; + } + + @Override + public ByteBuf writeZero(int length) { + ensureWritable(length); + for (int i = 0; i < length; i++) { + writeByte(0); + } + return this; + } + + @Override + public int writeCharSequence(CharSequence sequence, Charset charset) { + byte[] bytes = sequence.toString().getBytes(charset); + writeBytes(bytes); + return bytes.length; + } + + @Override + public int indexOf(int fromIndex, int toIndex, byte value) { + for (int i = fromIndex; i < toIndex; i++) { + if (getByte(i) == value) { + return i; + } + } + return -1; + } + + @Override + public int bytesBefore(byte value) { + return indexOf(readerIndex(), writerIndex(), value); + } + + @Override + public int bytesBefore(int length, byte value) { + return indexOf(readerIndex(), readerIndex() + length, value); + } + + @Override + public int bytesBefore(int index, int length, byte value) { + return indexOf(index, index + length, value); + } + + @Override + public int forEachByte(ByteProcessor processor) { + return buffer.openCursor().process(processor); + } + + @Override + public int forEachByte(int index, int length, ByteProcessor processor) { + return buffer.openCursor(index, length).process(processor); + } + + @Override + public int forEachByteDesc(ByteProcessor processor) { + return buffer.openReverseCursor().process(processor); + } + + @Override + public int forEachByteDesc(int index, int length, ByteProcessor processor) { + return buffer.openReverseCursor(index, length).process(processor); + } + + @Override + public ByteBuf copy() { + return copy(0, capacity()); + } + + @Override + public ByteBuf copy(int index, int length) { + BufferAllocator allocator = preferredBufferAllocator(); + Buffer copy = allocator.allocate(length); + buffer.copyInto(index, copy, 0, length); + copy.order(buffer.order()); + return wrap(copy).setIndex(readerIndex(), writerIndex()); + } + + @Override + public ByteBuf slice() { + return retainedSlice(); + } + + @Override + public ByteBuf retainedSlice() { + return wrap(buffer.slice()); + } + + @Override + public ByteBuf slice(int index, int length) { + return retainedSlice(index, length); + } + + @Override + public ByteBuf retainedSlice(int index, int length) { + return wrap(buffer.slice(index, length)); + } + + @Override + public ByteBuf duplicate() { + return retainedDuplicate(); + } + + @Override + public ByteBuf retainedDuplicate() { + return retainedSlice(0, capacity()); + } + + @Override + public int nioBufferCount() { + return -1; + } + + @Override + public ByteBuffer nioBuffer() { + throw new UnsupportedOperationException("Cannot create shared NIO buffer."); + } + + @Override + public ByteBuffer nioBuffer(int index, int length) { + return nioBuffer(); + } + + @Override + public ByteBuffer internalNioBuffer(int index, int length) { + return nioBuffer(); + } + + @Override + public ByteBuffer[] nioBuffers() { + throw new UnsupportedOperationException("Cannot create shared NIO buffers."); + } + + @Override + public ByteBuffer[] nioBuffers(int index, int length) { + return nioBuffers(); + } + + @Override + public boolean hasArray() { + return false; + } + + @Override + public byte[] array() { + throw new UnsupportedOperationException("This buffer has no array."); + } + + @Override + public int arrayOffset() { + throw new UnsupportedOperationException("This buffer has no array."); + } + + @Override + public boolean hasMemoryAddress() { + return buffer.nativeAddress() != 0; + } + + @Override + public long memoryAddress() { + if (!hasMemoryAddress()) { + throw new UnsupportedOperationException("No memory address associated with this buffer."); + } + return buffer.nativeAddress(); + } + + @Override + public String toString(Charset charset) { + return toString(readerIndex(), readableBytes(), charset); + } + + @Override + public String toString(int index, int length, Charset charset) { + byte[] bytes = new byte[length]; + getBytes(index, bytes); + return new String(bytes, charset); + } + + @Override + public int hashCode() { + int hash = 4242; + int capacity = capacity(); + for (int i = 0; i < capacity; i++) { + hash = 31 * hash + getByte(i); + } + return hash; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof ByteBufConvertible) { + ByteBuf other = ((ByteBufConvertible) obj).asByteBuf(); + boolean equal = true; + int capacity = capacity(); + if (other.capacity() != capacity) { + return false; + } + for (int i = 0; i < capacity; i++) { + equal &= getByte(i) == other.getByte(i); + } + return equal; + } + return false; + } + + @Override + public int compareTo(ByteBuf buffer) { + var cap = Math.min(capacity(), buffer.capacity()); + for (int i = 0; i < cap; i++) { + int cmp = Byte.compare(getByte(i), buffer.getByte(i)); + if (cmp != 0) { + return cmp; + } + } + return Integer.compare(capacity(), buffer.capacity()); + } + + @Override + public String toString() { + return "ByteBuf(" + readerIndex() + ", " + writerIndex() + ", " + capacity() + ')'; + } + + @Override + public ByteBuf retain(int increment) { + for (int i = 0; i < increment; i++) { + buffer.acquire(); + } + return this; + } + + @Override + public int refCnt() { + return buffer.isAccessible()? 1 + buffer.countBorrows() : 0; + } + + @Override + public ByteBuf retain() { + return retain(1); + } + + @Override + public ByteBuf touch() { + return this; + } + + @Override + public ByteBuf touch(Object hint) { + return this; + } + + @Override + public boolean release() { + return release(1); + } + + @Override + public boolean release(int decrement) { + for (int i = 0; i < decrement; i++) { + buffer.close(); + } + return !buffer.isAccessible(); + } + + private ByteBufAdaptor wrap(Buffer copy) { + return new ByteBufAdaptor(alloc, copy); + } + + private BufferAllocator preferredBufferAllocator() { + return isDirect()? alloc.getOffHeap() : alloc.getOnHeap(); + } +} diff --git a/src/main/java/io/netty/buffer/api/adaptor/ByteBufAllocatorAdaptor.java b/src/main/java/io/netty/buffer/api/adaptor/ByteBufAllocatorAdaptor.java new file mode 100644 index 0000000..760131b --- /dev/null +++ b/src/main/java/io/netty/buffer/api/adaptor/ByteBufAllocatorAdaptor.java @@ -0,0 +1,159 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer.api.adaptor; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.api.BufferAllocator; + +public class ByteBufAllocatorAdaptor implements ByteBufAllocator, AutoCloseable { + private final BufferAllocator onheap; + private final BufferAllocator offheap; + private boolean closed; + + public ByteBufAllocatorAdaptor() { + this(BufferAllocator.pooledHeap(), BufferAllocator.pooledDirect()); + } + + public ByteBufAllocatorAdaptor(BufferAllocator onheap, BufferAllocator offheap) { + this.onheap = onheap; + this.offheap = offheap; + } + + @Override + public ByteBuf buffer() { + return buffer(256); + } + + public BufferAllocator getOnHeap() { + return onheap; + } + + public BufferAllocator getOffHeap() { + return offheap; + } + + public boolean isClosed() { + return closed; + } + + @Override + public ByteBuf buffer(int initialCapacity) { + return new ByteBufAdaptor(this, onheap.allocate(initialCapacity)); + } + + @Override + public ByteBuf buffer(int initialCapacity, int maxCapacity) { + return buffer(maxCapacity); + } + + @Override + public ByteBuf ioBuffer() { + return directBuffer(); + } + + @Override + public ByteBuf ioBuffer(int initialCapacity) { + return directBuffer(initialCapacity); + } + + @Override + public ByteBuf ioBuffer(int initialCapacity, int maxCapacity) { + return directBuffer(initialCapacity, maxCapacity); + } + + @Override + public ByteBuf heapBuffer() { + return buffer(); + } + + @Override + public ByteBuf heapBuffer(int initialCapacity) { + return buffer(initialCapacity); + } + + @Override + public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) { + return buffer(initialCapacity, maxCapacity); + } + + @Override + public ByteBuf directBuffer() { + return directBuffer(256); + } + + @Override + public ByteBuf directBuffer(int initialCapacity) { + // TODO we cannot use off-heap buffers here, until the JDK allows direct byte buffers based on native + // memory segments to be used in IO operations. + return new ByteBufAdaptor(this, onheap.allocate(initialCapacity)); + } + + @Override + public ByteBuf directBuffer(int initialCapacity, int maxCapacity) { + return directBuffer(maxCapacity); + } + + @Override + public CompositeByteBuf compositeBuffer() { + return compositeHeapBuffer(); + } + + @Override + public CompositeByteBuf compositeBuffer(int maxNumComponents) { + return compositeHeapBuffer(maxNumComponents); + } + + @Override + public CompositeByteBuf compositeHeapBuffer() { + return compositeHeapBuffer(1024); + } + + @Override + public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) { + return new CompositeByteBuf(this, false, maxNumComponents, heapBuffer()); + } + + @Override + public CompositeByteBuf compositeDirectBuffer() { + return compositeDirectBuffer(1024); + } + + @Override + public CompositeByteBuf compositeDirectBuffer(int maxNumComponents) { + return new CompositeByteBuf(this, true, maxNumComponents, directBuffer()); + } + + @Override + public boolean isDirectBufferPooled() { + return true; + } + + @Override + public int calculateNewCapacity(int minNewCapacity, int maxCapacity) { + return 0; + } + + @Override + public void close() throws Exception { + try (onheap) { + try (offheap) { + closed = true; + } + } + } +} diff --git a/src/main/java/io/netty/buffer/api/adaptor/package-info.java b/src/main/java/io/netty/buffer/api/adaptor/package-info.java new file mode 100644 index 0000000..1296415 --- /dev/null +++ b/src/main/java/io/netty/buffer/api/adaptor/package-info.java @@ -0,0 +1,20 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +/** + * Helpers for integrating with the existing {@link io.netty.buffer.ByteBuf} API. + */ +package io.netty.buffer.api.adaptor; diff --git a/src/main/java/io/netty/buffer/api/memseg/MemSegBuffer.java b/src/main/java/io/netty/buffer/api/memseg/MemSegBuffer.java index 02fc14e..f68c3fb 100644 --- a/src/main/java/io/netty/buffer/api/memseg/MemSegBuffer.java +++ b/src/main/java/io/netty/buffer/api/memseg/MemSegBuffer.java @@ -15,6 +15,7 @@ */ package io.netty.buffer.api.memseg; +import io.netty.buffer.ByteBuf; import io.netty.buffer.api.BufferAllocator; import io.netty.buffer.api.AllocatorControl; import io.netty.buffer.api.Buffer; @@ -26,6 +27,9 @@ import io.netty.buffer.api.WritableComponentProcessor; import io.netty.buffer.api.Drop; import io.netty.buffer.api.Owned; import io.netty.buffer.api.RcSupport; +import io.netty.buffer.api.adaptor.BufferIntegratable; +import io.netty.buffer.api.adaptor.ByteBufAdaptor; +import io.netty.buffer.api.adaptor.ByteBufAllocatorAdaptor; import jdk.incubator.foreign.MemorySegment; import java.nio.ByteBuffer; @@ -46,7 +50,8 @@ import static jdk.incubator.foreign.MemoryAccess.setIntAtOffset; import static jdk.incubator.foreign.MemoryAccess.setLongAtOffset; import static jdk.incubator.foreign.MemoryAccess.setShortAtOffset; -class MemSegBuffer extends RcSupport implements Buffer, ReadableComponent, WritableComponent { +class MemSegBuffer extends RcSupport implements Buffer, ReadableComponent, WritableComponent, + BufferIntegratable { private static final MemorySegment CLOSED_SEGMENT; static final Drop SEGMENT_CLOSE; @@ -1138,6 +1143,66 @@ class MemSegBuffer extends RcSupport implements Buffer, Re return new RecoverableMemory(seg, alloc); } + // + private ByteBufAdaptor adaptor; + @Override + public ByteBuf asByteBuf() { + ByteBufAdaptor bba = adaptor; + if (bba == null) { + ByteBufAllocatorAdaptor alloc = new ByteBufAllocatorAdaptor( + BufferAllocator.heap(), BufferAllocator.direct()); + return adaptor = new ByteBufAdaptor(alloc, this); + } + return bba; + } + + @Override + public int readableBytes() { + return writerOffset() - readerOffset(); + } + + @Override + public MemSegBuffer retain(int increment) { + for (int i = 0; i < increment; i++) { + acquire(); + } + return this; + } + + @Override + public int refCnt() { + return isAccessible()? 1 + countBorrows() : 0; + } + + @Override + public MemSegBuffer retain() { + return retain(1); + } + + @Override + public MemSegBuffer touch() { + return this; + } + + @Override + public MemSegBuffer touch(Object hint) { + return this; + } + + @Override + public boolean release() { + return release(1); + } + + @Override + public boolean release(int decrement) { + for (int i = 0; i < decrement; i++) { + close(); + } + return !isAccessible(); + } + // + static final class RecoverableMemory { private final MemorySegment segment; private final AllocatorControl alloc; diff --git a/src/test/java/io/netty/buffer/api/EchoIT.java b/src/test/java/io/netty/buffer/api/EchoIT.java new file mode 100644 index 0000000..c9ea1fb --- /dev/null +++ b/src/test/java/io/netty/buffer/api/EchoIT.java @@ -0,0 +1,150 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer.api; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.api.adaptor.ByteBufAllocatorAdaptor; +import io.netty.buffer.api.examples.echo.EchoServerHandler; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.MultithreadEventLoopGroup; +import io.netty.channel.nio.NioHandler; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import org.junit.jupiter.api.Test; + +import java.net.InetSocketAddress; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class EchoIT { + // In this test we have a server and a client, where the server echos back anything it receives, + // and our client sends a single message to the server, and then verifies that it receives it back. + + @Test + void echoServerMustReplyWithSameData() throws Exception { + ByteBufAllocatorAdaptor allocator = new ByteBufAllocatorAdaptor(); + EventLoopGroup bossGroup = new MultithreadEventLoopGroup(1, NioHandler.newFactory()); + EventLoopGroup workerGroup = new MultithreadEventLoopGroup(NioHandler.newFactory()); + final EchoServerHandler serverHandler = new EchoServerHandler(); + try { + ServerBootstrap server = new ServerBootstrap(); + server.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .childOption(ChannelOption.ALLOCATOR, allocator) + .option(ChannelOption.SO_BACKLOG, 100) + .handler(new LoggingHandler(LogLevel.INFO)) + .childHandler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline p = ch.pipeline(); + p.addLast(new LoggingHandler(LogLevel.INFO)); + p.addLast(serverHandler); + } + }); + + // Start the server. + ChannelFuture bind = server.bind("localhost", 0).sync(); + InetSocketAddress serverAddress = (InetSocketAddress) bind.channel().localAddress(); + + // Configure the client. + EventLoopGroup group = new MultithreadEventLoopGroup(NioHandler.newFactory()); + try { + Bootstrap b = new Bootstrap(); + b.group(group) + .channel(NioSocketChannel.class) + .option(ChannelOption.TCP_NODELAY, true) + .handler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline p = ch.pipeline(); + p.addLast(new LoggingHandler(LogLevel.INFO)); + p.addLast(new EchoClientHandler()); + } + }); + + // Start the client. + ChannelFuture f = b.connect(serverAddress).sync(); + + // Wait until the connection is closed. + f.channel().closeFuture().sync(); + } finally { + // Shut down the event loop to terminate all threads. + group.shutdownGracefully(); + } + + // Shut down the server. + bind.channel().close().sync(); + } finally { + // Shut down all event loops to terminate all threads. + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + allocator.close(); + } + } + + static class EchoClientHandler implements ChannelHandler { + private static final int SIZE = 256; + private final Buffer firstMessage; + + /** + * Creates a client-side handler. + */ + EchoClientHandler() { + firstMessage = BufferAllocator.heap().allocate(SIZE); + for (int i = 0; i < SIZE; i++) { + firstMessage.writeByte((byte) i); + } + } + + @Override + public void channelActive(ChannelHandlerContext ctx) { + ctx.writeAndFlush(firstMessage); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + ByteBuf buf = (ByteBuf) msg; + assertEquals(SIZE, buf.capacity()); + assertEquals(SIZE, buf.readableBytes()); + for (int i = 0; i < SIZE; i++) { + assertEquals((byte) i, buf.readByte()); + } + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) { + ctx.close(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + // Close the connection when an exception is raised. + ctx.close(); + throw new RuntimeException(cause); + } + } +} diff --git a/src/test/java/io/netty/buffer/api/examples/echo/EchoClient.java b/src/test/java/io/netty/buffer/api/examples/echo/EchoClient.java new file mode 100644 index 0000000..2b62c82 --- /dev/null +++ b/src/test/java/io/netty/buffer/api/examples/echo/EchoClient.java @@ -0,0 +1,86 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer.api.examples.echo; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.MultithreadEventLoopGroup; +import io.netty.channel.nio.NioHandler; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.util.InsecureTrustManagerFactory; + +/** + * Sends one message when a connection is open and echoes back any received + * data to the server. Simply put, the echo client initiates the ping-pong + * traffic between the echo client and server by sending the first message to + * the server. + */ +public final class EchoClient { + + static final boolean SSL = System.getProperty("ssl") != null; + static final String HOST = System.getProperty("host", "127.0.0.1"); + static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); + static final int SIZE = Integer.parseInt(System.getProperty("size", "256")); + + public static void main(String[] args) throws Exception { + // Configure SSL.git + final SslContext sslCtx; + if (SSL) { + sslCtx = SslContextBuilder.forClient() + .trustManager(InsecureTrustManagerFactory.INSTANCE).build(); + } else { + sslCtx = null; + } + + // Configure the client. + EventLoopGroup group = new MultithreadEventLoopGroup(NioHandler.newFactory()); + try { + Bootstrap b = new Bootstrap(); + b.group(group) + .channel(NioSocketChannel.class) + .option(ChannelOption.TCP_NODELAY, true) + .handler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline p = ch.pipeline(); + if (sslCtx != null) { + p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT)); + } + p.addLast(new LoggingHandler(LogLevel.INFO)); + p.addLast(new EchoClientHandler()); + } + }); + + // Start the client. + ChannelFuture f = b.connect(HOST, PORT).sync(); + + // Wait until the connection is closed. + f.channel().closeFuture().sync(); + } finally { + // Shut down the event loop to terminate all threads. + group.shutdownGracefully(); + } + } +} diff --git a/src/test/java/io/netty/buffer/api/examples/echo/EchoClientHandler.java b/src/test/java/io/netty/buffer/api/examples/echo/EchoClientHandler.java new file mode 100644 index 0000000..b66bf37 --- /dev/null +++ b/src/test/java/io/netty/buffer/api/examples/echo/EchoClientHandler.java @@ -0,0 +1,63 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer.api.examples.echo; + +import io.netty.buffer.api.Buffer; +import io.netty.buffer.api.BufferAllocator; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; + +/** + * Handler implementation for the echo client. It initiates the ping-pong + * traffic between the echo client and server by sending the first message to + * the server. + */ +public class EchoClientHandler implements ChannelHandler { + + private final Buffer firstMessage; + + /** + * Creates a client-side handler. + */ + public EchoClientHandler() { + firstMessage = BufferAllocator.heap().allocate(EchoClient.SIZE); + for (int i = 0; i < firstMessage.capacity(); i ++) { + firstMessage.writeByte((byte) i); + } + } + + @Override + public void channelActive(ChannelHandlerContext ctx) { + ctx.writeAndFlush(firstMessage); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + ctx.write(msg); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) { + ctx.flush(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + // Close the connection when an exception is raised. + cause.printStackTrace(); + ctx.close(); + } +} diff --git a/src/test/java/io/netty/buffer/api/examples/echo/EchoServer.java b/src/test/java/io/netty/buffer/api/examples/echo/EchoServer.java new file mode 100644 index 0000000..ee235d3 --- /dev/null +++ b/src/test/java/io/netty/buffer/api/examples/echo/EchoServer.java @@ -0,0 +1,89 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer.api.examples.echo; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.api.adaptor.ByteBufAllocatorAdaptor; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.MultithreadEventLoopGroup; +import io.netty.channel.nio.NioHandler; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.util.SelfSignedCertificate; + +/** + * Echoes back any received data from a client. + */ +public final class EchoServer { + + static final boolean SSL = System.getProperty("ssl") != null; + static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); + + public static void main(String[] args) throws Exception { + // Configure SSL. + final SslContext sslCtx; + if (SSL) { + SelfSignedCertificate ssc = new SelfSignedCertificate(); + sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build(); + } else { + sslCtx = null; + } + + // Configure the server. + ByteBufAllocatorAdaptor allocator = new ByteBufAllocatorAdaptor(); + EventLoopGroup bossGroup = new MultithreadEventLoopGroup(1, NioHandler.newFactory()); + EventLoopGroup workerGroup = new MultithreadEventLoopGroup(NioHandler.newFactory()); + final EchoServerHandler serverHandler = new EchoServerHandler(); + try { + ServerBootstrap b = new ServerBootstrap(); + b.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .childOption(ChannelOption.ALLOCATOR, allocator) + .option(ChannelOption.SO_BACKLOG, 100) + .handler(new LoggingHandler(LogLevel.INFO)) + .childHandler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline p = ch.pipeline(); + if (sslCtx != null) { + p.addLast(sslCtx.newHandler(ch.alloc())); + } + p.addLast(new LoggingHandler(LogLevel.INFO)); + p.addLast(serverHandler); + } + }); + + // Start the server. + ChannelFuture f = b.bind(PORT).sync(); + + // Wait until the server socket is closed. + f.channel().closeFuture().sync(); + } finally { + // Shut down all event loops to terminate all threads. + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + allocator.close(); + } + } +} diff --git a/src/test/java/io/netty/buffer/api/examples/echo/EchoServerHandler.java b/src/test/java/io/netty/buffer/api/examples/echo/EchoServerHandler.java new file mode 100644 index 0000000..bc4306c --- /dev/null +++ b/src/test/java/io/netty/buffer/api/examples/echo/EchoServerHandler.java @@ -0,0 +1,50 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer.api.examples.echo; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.api.Buffer; +import io.netty.buffer.api.adaptor.ByteBufAdaptor; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerContext; + +/** + * Handler implementation for the echo server. + */ +@Sharable +public class EchoServerHandler implements ChannelHandler { + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + if (msg instanceof ByteBuf) { + // For this example, we only echo back buffers that are using the new buffer API. + Buffer buf = ByteBufAdaptor.extract((ByteBuf) msg); + ctx.write(buf); + } + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) { + ctx.flush(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + // Close the connection when an exception is raised. + cause.printStackTrace(); + ctx.close(); + } +}