From 1b65bf9a23a5da2fb5e28e0ac2251f8f348c5bd1 Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Wed, 17 Feb 2021 13:54:11 +0100 Subject: [PATCH] Make the incubating buffers exposable as ByteBuf Motivation: This makes it possible to use the new buffer API in Netty as is. Modification: Make the MemSegBuffer implementation class implement AsByteBuf and ReferenceCounted. The produced ByteBuf instance delegates all calls to the underlying Buffer instance as faithfully as possible. One area where the two deviates, is that it's not possible to create non-retained duplicates and slices with the new buffer API. Result: It is now possible to use the new buffer API on both client and server side. The Echo* examples demonstrate this, and the EchoIT proves it with a test. The API is used more directly on the client side, since the server-side allocator in Netty does not know how to allocate buffers with the incubating API. --- pom.xml | 6 + .../io/netty/buffer/api/BufferHolder.java | 5 + src/main/java/io/netty/buffer/api/Rc.java | 9 + .../java/io/netty/buffer/api/RcSupport.java | 5 + .../api/adaptor/BufferIntegratable.java | 25 + .../buffer/api/adaptor/ByteBufAdaptor.java | 1298 +++++++++++++++++ .../api/adaptor/ByteBufAllocatorAdaptor.java | 157 ++ .../buffer/api/adaptor/package-info.java | 20 + .../netty/buffer/api/memseg/MemSegBuffer.java | 67 +- src/test/java/io/netty/buffer/api/EchoIT.java | 150 ++ .../buffer/api/examples/echo/EchoClient.java | 86 ++ .../api/examples/echo/EchoClientHandler.java | 63 + .../buffer/api/examples/echo/EchoServer.java | 89 ++ .../api/examples/echo/EchoServerHandler.java | 44 + 14 files changed, 2023 insertions(+), 1 deletion(-) create mode 100644 src/main/java/io/netty/buffer/api/adaptor/BufferIntegratable.java create mode 100644 src/main/java/io/netty/buffer/api/adaptor/ByteBufAdaptor.java create mode 100644 src/main/java/io/netty/buffer/api/adaptor/ByteBufAllocatorAdaptor.java create mode 100644 src/main/java/io/netty/buffer/api/adaptor/package-info.java create mode 100644 src/test/java/io/netty/buffer/api/EchoIT.java create mode 100644 src/test/java/io/netty/buffer/api/examples/echo/EchoClient.java create mode 100644 src/test/java/io/netty/buffer/api/examples/echo/EchoClientHandler.java create mode 100644 src/test/java/io/netty/buffer/api/examples/echo/EchoServer.java create mode 100644 src/test/java/io/netty/buffer/api/examples/echo/EchoServerHandler.java diff --git a/pom.xml b/pom.xml index a3321b0..49b74f2 100644 --- a/pom.xml +++ b/pom.xml @@ -394,6 +394,12 @@ ${netty.build.version} test + + io.netty + netty-handler + ${netty.version} + test + org.openjdk.jmh jmh-core diff --git a/src/main/java/io/netty/buffer/api/BufferHolder.java b/src/main/java/io/netty/buffer/api/BufferHolder.java index 13f5c01..fd01d31 100644 --- a/src/main/java/io/netty/buffer/api/BufferHolder.java +++ b/src/main/java/io/netty/buffer/api/BufferHolder.java @@ -190,4 +190,9 @@ public abstract class BufferHolder> implements Rc { protected final Buffer getBufVolatile() { return (Buffer) BUF.getVolatile(this); } + + @Override + public boolean isAccessible() { + return buf.isAccessible(); + } } diff --git a/src/main/java/io/netty/buffer/api/Rc.java b/src/main/java/io/netty/buffer/api/Rc.java index 2cc50f7..beec593 100644 --- a/src/main/java/io/netty/buffer/api/Rc.java +++ b/src/main/java/io/netty/buffer/api/Rc.java @@ -87,4 +87,13 @@ public interface Rc> extends AutoCloseable, Deref { * @return The number of borrows, if any, of this object. */ int countBorrows(); + + /** + * Check if this object is accessible. + * + * @return {@code true} if this object is still valid and can be accessed, + * otherwise {@code false} if, for instance, this object has been dropped/deallocated, + * or been {@linkplain #send() sent} elsewhere. + */ + boolean isAccessible(); } diff --git a/src/main/java/io/netty/buffer/api/RcSupport.java b/src/main/java/io/netty/buffer/api/RcSupport.java index 1541810..2d1003f 100644 --- a/src/main/java/io/netty/buffer/api/RcSupport.java +++ b/src/main/java/io/netty/buffer/api/RcSupport.java @@ -102,6 +102,11 @@ public abstract class RcSupport, T extends RcSupport> impl return Math.max(acquires, 0); } + @Override + public boolean isAccessible() { + return acquires >= 0; + } + /** * Prepare this instance for ownsership transfer. This method is called from {@link #send()} in the sending thread. * This method should put this Rc in a deactivated state where it is no longer accessible from the currently owning diff --git a/src/main/java/io/netty/buffer/api/adaptor/BufferIntegratable.java b/src/main/java/io/netty/buffer/api/adaptor/BufferIntegratable.java new file mode 100644 index 0000000..fe9a73d --- /dev/null +++ b/src/main/java/io/netty/buffer/api/adaptor/BufferIntegratable.java @@ -0,0 +1,25 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer.api.adaptor; + +import io.netty.buffer.ByteBufConvertible; +import io.netty.util.ReferenceCounted; + +/** + * Interfaces that are required for an object to stand-in for a {@link io.netty.buffer.ByteBuf} in Netty. + */ +public interface BufferIntegratable extends ByteBufConvertible, ReferenceCounted { +} diff --git a/src/main/java/io/netty/buffer/api/adaptor/ByteBufAdaptor.java b/src/main/java/io/netty/buffer/api/adaptor/ByteBufAdaptor.java new file mode 100644 index 0000000..e55baee --- /dev/null +++ b/src/main/java/io/netty/buffer/api/adaptor/ByteBufAdaptor.java @@ -0,0 +1,1298 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer.api.adaptor; + +import io.netty.buffer.ByteBufConvertible; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.Unpooled; +import io.netty.buffer.api.Buffer; +import io.netty.buffer.api.BufferAllocator; +import io.netty.util.ByteProcessor; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.FileChannel; +import java.nio.channels.GatheringByteChannel; +import java.nio.channels.ScatteringByteChannel; +import java.nio.charset.Charset; + +public class ByteBufAdaptor extends ByteBuf { + private final ByteBufAllocatorAdaptor alloc; + private final Buffer buffer; + + public ByteBufAdaptor(ByteBufAllocatorAdaptor alloc, Buffer buffer) { + this.alloc = alloc; + this.buffer = buffer; + } + + @Override + public int capacity() { + return buffer.capacity(); + } + + @Override + public ByteBuf capacity(int newCapacity) { + int diff = newCapacity - capacity() - buffer.writableBytes(); + if (diff > 0) { + buffer.ensureWritable(diff); + } + return this; + } + + @Override + public int maxCapacity() { + return capacity(); + } + + @Override + public ByteBufAllocator alloc() { + return alloc; + } + + @Override + public ByteOrder order() { + return buffer.order(); + } + + @Override + public ByteBuf order(ByteOrder endianness) { + buffer.order(endianness); + return this; + } + + @Override + public ByteBuf unwrap() { + return null; + } + + @Override + public boolean isDirect() { + return buffer.nativeAddress() != 0; + } + + @Override + public boolean isReadOnly() { + return buffer.readOnly(); + } + + @Override + public ByteBuf asReadOnly() { + return Unpooled.unreleasableBuffer(this); + } + + @Override + public int readerIndex() { + return buffer.readerOffset(); + } + + @Override + public ByteBuf readerIndex(int readerIndex) { + buffer.readerOffset(readerIndex); + return this; + } + + @Override + public int writerIndex() { + return buffer.writerOffset(); + } + + @Override + public ByteBuf writerIndex(int writerIndex) { + buffer.writerOffset(writerIndex); + return this; + } + + @Override + public ByteBuf setIndex(int readerIndex, int writerIndex) { + return readerIndex(readerIndex).writerIndex(writerIndex); + } + + @Override + public int readableBytes() { + return buffer.readableBytes(); + } + + @Override + public int writableBytes() { + return buffer.writableBytes(); + } + + @Override + public int maxWritableBytes() { + return writableBytes(); + } + + @Override + public boolean isReadable() { + return readableBytes() > 0; + } + + @Override + public boolean isReadable(int size) { + return readableBytes() >= size; + } + + @Override + public boolean isWritable() { + return writableBytes() > 0; + } + + @Override + public boolean isWritable(int size) { + return writableBytes() >= size; + } + + @Override + public ByteBuf clear() { + return setIndex(0, 0); + } + + @Override + public ByteBuf discardReadBytes() { + buffer.compact(); + return this; + } + + @Override + public ByteBuf discardSomeReadBytes() { + return discardReadBytes(); + } + + @Override + public ByteBuf ensureWritable(int minWritableBytes) { + buffer.ensureWritable(minWritableBytes); + return this; + } + + @Override + public int ensureWritable(int minWritableBytes, boolean force) { + buffer.ensureWritable(minWritableBytes); + return minWritableBytes; + } + + @Override + public boolean getBoolean(int index) { + return getByte(index) != 0; + } + + @Override + public byte getByte(int index) { + return buffer.getByte(index); + } + + @Override + public short getUnsignedByte(int index) { + return (short) buffer.getUnsignedByte(index); + } + + @Override + public short getShort(int index) { + return buffer.getShort(index); + } + + @Override + public short getShortLE(int index) { + ByteOrder originalOrder = buffer.order(); + try { + return buffer.order(ByteOrder.LITTLE_ENDIAN).getShort(index); + } finally { + buffer.order(originalOrder); + } + } + + @Override + public int getUnsignedShort(int index) { + return buffer.getUnsignedShort(index); + } + + @Override + public int getUnsignedShortLE(int index) { + ByteOrder originalOrder = buffer.order(); + try { + return buffer.order(ByteOrder.LITTLE_ENDIAN).getUnsignedShort(index); + } finally { + buffer.order(originalOrder); + } + } + + @Override + public int getMedium(int index) { + return buffer.getMedium(index); + } + + @Override + public int getMediumLE(int index) { + ByteOrder originalOrder = buffer.order(); + try { + return buffer.order(ByteOrder.LITTLE_ENDIAN).getMedium(index); + } finally { + buffer.order(originalOrder); + } + } + + @Override + public int getUnsignedMedium(int index) { + return buffer.getUnsignedMedium(index); + } + + @Override + public int getUnsignedMediumLE(int index) { + ByteOrder originalOrder = buffer.order(); + try { + return buffer.order(ByteOrder.LITTLE_ENDIAN).getUnsignedMedium(index); + } finally { + buffer.order(originalOrder); + } + } + + @Override + public int getInt(int index) { + return buffer.getInt(index); + } + + @Override + public int getIntLE(int index) { + ByteOrder originalOrder = buffer.order(); + try { + return buffer.order(ByteOrder.LITTLE_ENDIAN).getInt(index); + } finally { + buffer.order(originalOrder); + } + } + + @Override + public long getUnsignedInt(int index) { + return buffer.getUnsignedInt(index); + } + + @Override + public long getUnsignedIntLE(int index) { + ByteOrder originalOrder = buffer.order(); + try { + return buffer.order(ByteOrder.LITTLE_ENDIAN).getUnsignedInt(index); + } finally { + buffer.order(originalOrder); + } + } + + @Override + public long getLong(int index) { + return buffer.getLong(index); + } + + @Override + public long getLongLE(int index) { + ByteOrder originalOrder = buffer.order(); + try { + return buffer.order(ByteOrder.LITTLE_ENDIAN).getLong(index); + } finally { + buffer.order(originalOrder); + } + } + + @Override + public char getChar(int index) { + return buffer.getChar(index); + } + + @Override + public float getFloat(int index) { + return buffer.getFloat(index); + } + + @Override + public double getDouble(int index) { + return buffer.getDouble(index); + } + + @Override + public ByteBuf getBytes(int index, ByteBuf dst) { + while (dst.isWritable()) { + dst.writeByte(getByte(index++)); + } + return this; + } + + @Override + public ByteBuf getBytes(int index, ByteBuf dst, int length) { + for (int i = 0; i < length; i++) { + dst.writeByte(getByte(index + i)); + } + return this; + } + + @Override + public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) { + for (int i = 0; i < length; i++) { + dst.setByte(dstIndex + i, getByte(index + i)); + } + return this; + } + + @Override + public ByteBuf getBytes(int index, byte[] dst) { + return getBytes(index, dst, 0, dst.length); + } + + @Override + public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) { + for (int i = 0; i < length; i++) { + dst[dstIndex + i] = getByte(index + i); + } + return this; + } + + @Override + public ByteBuf getBytes(int index, ByteBuffer dst) { + while (dst.hasRemaining()) { + dst.put(getByte(index)); + index++; + } + return this; + } + + @Override + public ByteBuf getBytes(int index, OutputStream out, int length) throws IOException { + for (int i = 0; i < length; i++) { + out.write(getByte(index + i)); + } + return this; + } + + @Override + public int getBytes(int index, GatheringByteChannel out, int length) throws IOException { + ByteBuffer transfer = ByteBuffer.allocate(length); + buffer.copyInto(index, transfer, 0, length); + return out.write(transfer); + } + + @Override + public int getBytes(int index, FileChannel out, long position, int length) throws IOException { + ByteBuffer transfer = ByteBuffer.allocate(length); + buffer.copyInto(index, transfer, 0, length); + return out.write(transfer, position); + } + + @Override + public CharSequence getCharSequence(int index, int length, Charset charset) { + byte[] bytes = new byte[length]; + getBytes(index, bytes); + return new String(bytes, charset); + } + + @Override + public ByteBuf setBoolean(int index, boolean value) { + return setByte(index, value? 1 : 0); + } + + @Override + public ByteBuf setByte(int index, int value) { + buffer.setByte(index, (byte) value); + return this; + } + + @Override + public ByteBuf setShort(int index, int value) { + buffer.setShort(index, (short) value); + return this; + } + + @Override + public ByteBuf setShortLE(int index, int value) { + ByteOrder originalOrder = buffer.order(); + try { + buffer.order(ByteOrder.LITTLE_ENDIAN).setShort(index, (short) value); + return this; + } finally { + buffer.order(originalOrder); + } + } + + @Override + public ByteBuf setMedium(int index, int value) { + buffer.setMedium(index, value); + return this; + } + + @Override + public ByteBuf setMediumLE(int index, int value) { + ByteOrder originalOrder = buffer.order(); + try { + buffer.order(ByteOrder.LITTLE_ENDIAN).setMedium(index, value); + return this; + } finally { + buffer.order(originalOrder); + } + } + + @Override + public ByteBuf setInt(int index, int value) { + buffer.setInt(index, value); + return this; + } + + @Override + public ByteBuf setIntLE(int index, int value) { + ByteOrder originalOrder = buffer.order(); + try { + buffer.order(ByteOrder.LITTLE_ENDIAN).setInt(index, value); + return this; + } finally { + buffer.order(originalOrder); + } + } + + @Override + public ByteBuf setLong(int index, long value) { + buffer.setLong(index, value); + return this; + } + + @Override + public ByteBuf setLongLE(int index, long value) { + ByteOrder originalOrder = buffer.order(); + try { + buffer.order(ByteOrder.LITTLE_ENDIAN).setLong(index, value); + return this; + } finally { + buffer.order(originalOrder); + } + } + + @Override + public ByteBuf setChar(int index, int value) { + buffer.setChar(index, (char) value); + return this; + } + + @Override + public ByteBuf setFloat(int index, float value) { + buffer.setFloat(index, value); + return this; + } + + @Override + public ByteBuf setDouble(int index, double value) { + buffer.setDouble(index, value); + return this; + } + + @Override + public ByteBuf setBytes(int index, ByteBuf src) { + while (src.isReadable() && index < capacity()) { + setByte(index++, src.readByte()); + } + return this; + } + + @Override + public ByteBuf setBytes(int index, ByteBuf src, int length) { + for (int i = 0; i < length; i++) { + setByte(index + i, src.readByte()); + } + return this; + } + + @Override + public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) { + for (int i = 0; i < length; i++) { + setByte(index + i, src.getByte(srcIndex + i)); + } + return this; + } + + @Override + public ByteBuf setBytes(int index, byte[] src) { + return setBytes(index, src, 0, src.length); + } + + @Override + public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) { + for (int i = 0; i < length; i++) { + setByte(index + i, src[srcIndex + i]); + } + return this; + } + + @Override + public ByteBuf setBytes(int index, ByteBuffer src) { + while (src.hasRemaining()) { + setByte(index, src.get()); + index++; + } + return this; + } + + @Override + public int setBytes(int index, InputStream in, int length) throws IOException { + byte[] bytes = in.readNBytes(length); + setBytes(index, bytes, 0, length); + return bytes.length; + } + + @Override + public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException { + ByteBuffer transfer = ByteBuffer.allocate(length); + int bytes = in.read(transfer); + transfer.flip(); + setBytes(index, transfer); + return bytes; + } + + @Override + public int setBytes(int index, FileChannel in, long position, int length) throws IOException { + ByteBuffer transfer = ByteBuffer.allocate(length); + int bytes = in.read(transfer, position); + transfer.flip(); + setBytes(index, transfer); + return bytes; + } + + @Override + public ByteBuf setZero(int index, int length) { + for (int i = 0; i < length; i++) { + setByte(index + i, 0); + } + return this; + } + + @Override + public int setCharSequence(int index, CharSequence sequence, Charset charset) { + byte[] bytes = sequence.toString().getBytes(charset); + for (int i = 0; i < bytes.length; i++) { + setByte(index + i, bytes[i]); + } + return bytes.length; + } + + @Override + public boolean readBoolean() { + return readByte() != 0; + } + + @Override + public byte readByte() { + return buffer.readByte(); + } + + @Override + public short readUnsignedByte() { + return (short) buffer.readUnsignedByte(); + } + + @Override + public short readShort() { + return buffer.readShort(); + } + + @Override + public short readShortLE() { + ByteOrder originalOrder = buffer.order(); + try { + return buffer.order(ByteOrder.LITTLE_ENDIAN).readShort(); + } finally { + buffer.order(originalOrder); + } + } + + @Override + public int readUnsignedShort() { + return buffer.readUnsignedShort(); + } + + @Override + public int readUnsignedShortLE() { + ByteOrder originalOrder = buffer.order(); + try { + return buffer.order(ByteOrder.LITTLE_ENDIAN).readUnsignedShort(); + } finally { + buffer.order(originalOrder); + } + } + + @Override + public int readMedium() { + return buffer.readMedium(); + } + + @Override + public int readMediumLE() { + ByteOrder originalOrder = buffer.order(); + try { + return buffer.order(ByteOrder.LITTLE_ENDIAN).readMedium(); + } finally { + buffer.order(originalOrder); + } + } + + @Override + public int readUnsignedMedium() { + return buffer.readUnsignedMedium(); + } + + @Override + public int readUnsignedMediumLE() { + ByteOrder originalOrder = buffer.order(); + try { + return buffer.order(ByteOrder.LITTLE_ENDIAN).readUnsignedMedium(); + } finally { + buffer.order(originalOrder); + } + } + + @Override + public int readInt() { + return buffer.readInt(); + } + + @Override + public int readIntLE() { + ByteOrder originalOrder = buffer.order(); + try { + return buffer.order(ByteOrder.LITTLE_ENDIAN).readInt(); + } finally { + buffer.order(originalOrder); + } + } + + @Override + public long readUnsignedInt() { + return buffer.readUnsignedInt(); + } + + @Override + public long readUnsignedIntLE() { + ByteOrder originalOrder = buffer.order(); + try { + return buffer.order(ByteOrder.LITTLE_ENDIAN).readUnsignedInt(); + } finally { + buffer.order(originalOrder); + } + } + + @Override + public long readLong() { + return buffer.readLong(); + } + + @Override + public long readLongLE() { + ByteOrder originalOrder = buffer.order(); + try { + return buffer.order(ByteOrder.LITTLE_ENDIAN).readLong(); + } finally { + buffer.order(originalOrder); + } + } + + @Override + public char readChar() { + return buffer.readChar(); + } + + @Override + public float readFloat() { + return buffer.readFloat(); + } + + @Override + public double readDouble() { + return buffer.readDouble(); + } + + @Override + public ByteBuf readBytes(int length) { + Buffer copy = preferredBufferAllocator().allocate(length); + buffer.copyInto(0, copy, 0, length); + return wrap(copy); + } + + @Override + public ByteBuf readSlice(int length) { + return readRetainedSlice(length); + } + + @Override + public ByteBuf readRetainedSlice(int length) { + return slice(readerIndex(), length); + } + + @Override + public ByteBuf readBytes(ByteBuf dst) { + while (dst.isWritable()) { + dst.writeByte(readByte()); + } + return this; + } + + @Override + public ByteBuf readBytes(ByteBuf dst, int length) { + for (int i = 0; i < length; i++) { + dst.writeByte(readByte()); + } + return this; + } + + @Override + public ByteBuf readBytes(ByteBuf dst, int dstIndex, int length) { + for (int i = 0; i < length; i++) { + dst.setByte(dstIndex + i, readByte()); + } + return this; + } + + @Override + public ByteBuf readBytes(byte[] dst) { + return readBytes(dst, 0, dst.length); + } + + @Override + public ByteBuf readBytes(byte[] dst, int dstIndex, int length) { + for (int i = 0; i < length; i++) { + dst[dstIndex + i] = readByte(); + } + return this; + } + + @Override + public ByteBuf readBytes(ByteBuffer dst) { + while (dst.hasRemaining()) { + dst.put(readByte()); + } + return this; + } + + @Override + public ByteBuf readBytes(OutputStream out, int length) throws IOException { + for (int i = 0; i < length; i++) { + out.write(readByte()); + } + return this; + } + + @Override + public int readBytes(GatheringByteChannel out, int length) throws IOException { + ByteBuffer[] components = new ByteBuffer[buffer.countReadableComponents()]; + buffer.forEachReadable(0, (i, component) -> { + components[i] = component.readableBuffer(); + return true; + }); + int written = (int) out.write(components); + skipBytes(written); + return written; + } + + @Override + public CharSequence readCharSequence(int length, Charset charset) { + byte[] bytes = new byte[length]; + readBytes(bytes); + return new String(bytes, charset); + } + + @Override + public int readBytes(FileChannel out, long position, int length) throws IOException { + ByteBuffer[] components = new ByteBuffer[buffer.countReadableComponents()]; + buffer.forEachReadable(0, (i, component) -> { + components[i] = component.readableBuffer(); + return true; + }); + int written = 0; + for (ByteBuffer component : components) { + written += out.write(component, position + written); + if (component.hasRemaining()) { + break; + } + } + skipBytes(written); + return written; + } + + @Override + public ByteBuf skipBytes(int length) { + buffer.readerOffset(length + buffer.readerOffset()); + return this; + } + + @Override + public ByteBuf writeBoolean(boolean value) { + return writeByte(value? 1 : 0); + } + + @Override + public ByteBuf writeByte(int value) { + ensureWritable(1); + buffer.writeByte((byte) value); + return this; + } + + @Override + public ByteBuf writeShort(int value) { + ensureWritable(2); + buffer.writeShort((short) value); + return this; + } + + @Override + public ByteBuf writeShortLE(int value) { + ensureWritable(2); + ByteOrder originalOrder = buffer.order(); + try { + buffer.order(ByteOrder.LITTLE_ENDIAN).writeShort((short) value); + return this; + } finally { + buffer.order(originalOrder); + } + } + + @Override + public ByteBuf writeMedium(int value) { + ensureWritable(3); + buffer.writeMedium(value); + return this; + } + + @Override + public ByteBuf writeMediumLE(int value) { + ensureWritable(3); + ByteOrder originalOrder = buffer.order(); + try { + buffer.order(ByteOrder.LITTLE_ENDIAN).writeMedium(value); + return this; + } finally { + buffer.order(originalOrder); + } + } + + @Override + public ByteBuf writeInt(int value) { + ensureWritable(4); + buffer.writeInt(value); + return this; + } + + @Override + public ByteBuf writeIntLE(int value) { + ensureWritable(4); + ByteOrder originalOrder = buffer.order(); + try { + buffer.order(ByteOrder.LITTLE_ENDIAN).writeInt(value); + return this; + } finally { + buffer.order(originalOrder); + } + } + + @Override + public ByteBuf writeLong(long value) { + ensureWritable(8); + buffer.writeLong(value); + return this; + } + + @Override + public ByteBuf writeLongLE(long value) { + ensureWritable(8); + ByteOrder originalOrder = buffer.order(); + try { + buffer.order(ByteOrder.LITTLE_ENDIAN).writeLong(value); + return this; + } finally { + buffer.order(originalOrder); + } + } + + @Override + public ByteBuf writeChar(int value) { + ensureWritable(2); + buffer.writeChar((char) value); + return this; + } + + @Override + public ByteBuf writeFloat(float value) { + ensureWritable(4); + buffer.writeFloat(value); + return this; + } + + @Override + public ByteBuf writeDouble(double value) { + ensureWritable(8); + buffer.writeDouble(value); + return this; + } + + @Override + public ByteBuf writeBytes(ByteBuf src) { + return writeBytes(src, src.readableBytes()); + } + + @Override + public ByteBuf writeBytes(ByteBuf src, int length) { + ensureWritable(length); + for (int i = 0; i < length; i++) { + writeByte(src.readByte()); + } + return this; + } + + @Override + public ByteBuf writeBytes(ByteBuf src, int srcIndex, int length) { + ensureWritable(length); + for (int i = 0; i < length; i++) { + writeByte(src.getByte(srcIndex + i)); + } + return this; + } + + @Override + public ByteBuf writeBytes(byte[] src) { + ensureWritable(src.length); + for (byte b : src) { + writeByte(b); + } + return this; + } + + @Override + public ByteBuf writeBytes(byte[] src, int srcIndex, int length) { + ensureWritable(length); + for (int i = 0; i < length; i++) { + writeByte(src[srcIndex + i]); + } + return this; + } + + @Override + public ByteBuf writeBytes(ByteBuffer src) { + ensureWritable(src.remaining()); + while (src.hasRemaining()) { + writeByte(src.get()); + } + return this; + } + + @Override + public int writeBytes(InputStream in, int length) throws IOException { + byte[] bytes = in.readNBytes(length); + writeBytes(bytes); + return bytes.length; + } + + @Override + public int writeBytes(ScatteringByteChannel in, int length) throws IOException { + ensureWritable(length); + ByteBuffer[] components = new ByteBuffer[buffer.countWritableComponents()]; + buffer.forEachWritable(0, (i, component) -> { + components[i] = component.writableBuffer(); + return true; + }); + int read = (int) in.read(components); + writerIndex(read + writerIndex()); + return read; + } + + @Override + public int writeBytes(FileChannel in, long position, int length) throws IOException { + ensureWritable(length); + ByteBuffer[] components = new ByteBuffer[buffer.countWritableComponents()]; + buffer.forEachWritable(0, (i, component) -> { + components[i] = component.writableBuffer(); + return true; + }); + int read = 0; + for (ByteBuffer component : components) { + read += in.read(component, position + read); + if (component.hasRemaining()) { + break; + } + } + writerIndex(read + writerIndex()); + return read; + } + + @Override + public ByteBuf writeZero(int length) { + ensureWritable(length); + for (int i = 0; i < length; i++) { + writeByte(0); + } + return this; + } + + @Override + public int writeCharSequence(CharSequence sequence, Charset charset) { + byte[] bytes = sequence.toString().getBytes(charset); + writeBytes(bytes); + return bytes.length; + } + + @Override + public int indexOf(int fromIndex, int toIndex, byte value) { + for (int i = fromIndex; i < toIndex; i++) { + if (getByte(i) == value) { + return i; + } + } + return -1; + } + + @Override + public int bytesBefore(byte value) { + return indexOf(readerIndex(), writerIndex(), value); + } + + @Override + public int bytesBefore(int length, byte value) { + return indexOf(readerIndex(), readerIndex() + length, value); + } + + @Override + public int bytesBefore(int index, int length, byte value) { + return indexOf(index, index + length, value); + } + + @Override + public int forEachByte(ByteProcessor processor) { + return buffer.openCursor().process(processor); + } + + @Override + public int forEachByte(int index, int length, ByteProcessor processor) { + return buffer.openCursor(index, length).process(processor); + } + + @Override + public int forEachByteDesc(ByteProcessor processor) { + return buffer.openReverseCursor().process(processor); + } + + @Override + public int forEachByteDesc(int index, int length, ByteProcessor processor) { + return buffer.openReverseCursor(index, length).process(processor); + } + + @Override + public ByteBuf copy() { + return copy(0, capacity()); + } + + @Override + public ByteBuf copy(int index, int length) { + BufferAllocator allocator = preferredBufferAllocator(); + Buffer copy = allocator.allocate(length); + buffer.copyInto(index, copy, 0, length); + copy.order(buffer.order()); + return wrap(copy).setIndex(readerIndex(), writerIndex()); + } + + @Override + public ByteBuf slice() { + return retainedSlice(); + } + + @Override + public ByteBuf retainedSlice() { + return wrap(buffer.slice()); + } + + @Override + public ByteBuf slice(int index, int length) { + return retainedSlice(index, length); + } + + @Override + public ByteBuf retainedSlice(int index, int length) { + return wrap(buffer.slice(index, length)); + } + + @Override + public ByteBuf duplicate() { + return retainedDuplicate(); + } + + @Override + public ByteBuf retainedDuplicate() { + return retainedSlice(0, capacity()); + } + + @Override + public int nioBufferCount() { + return -1; + } + + @Override + public ByteBuffer nioBuffer() { + throw new UnsupportedOperationException("Cannot create shared NIO buffer."); + } + + @Override + public ByteBuffer nioBuffer(int index, int length) { + return nioBuffer(); + } + + @Override + public ByteBuffer internalNioBuffer(int index, int length) { + return nioBuffer(); + } + + @Override + public ByteBuffer[] nioBuffers() { + throw new UnsupportedOperationException("Cannot create shared NIO buffers."); + } + + @Override + public ByteBuffer[] nioBuffers(int index, int length) { + return nioBuffers(); + } + + @Override + public boolean hasArray() { + return false; + } + + @Override + public byte[] array() { + throw new UnsupportedOperationException("This buffer has no array."); + } + + @Override + public int arrayOffset() { + throw new UnsupportedOperationException("This buffer has no array."); + } + + @Override + public boolean hasMemoryAddress() { + return buffer.nativeAddress() != 0; + } + + @Override + public long memoryAddress() { + if (!hasMemoryAddress()) { + throw new UnsupportedOperationException("No memory address associated with this buffer."); + } + return buffer.nativeAddress(); + } + + @Override + public String toString(Charset charset) { + return toString(readerIndex(), readableBytes(), charset); + } + + @Override + public String toString(int index, int length, Charset charset) { + byte[] bytes = new byte[length]; + getBytes(index, bytes); + return new String(bytes, charset); + } + + @Override + public int hashCode() { + int hash = 4242; + int capacity = capacity(); + for (int i = 0; i < capacity; i++) { + hash = 31 * hash + getByte(i); + } + return hash; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof ByteBufConvertible) { + ByteBuf other = ((ByteBufConvertible) obj).asByteBuf(); + boolean equal = true; + int capacity = capacity(); + if (other.capacity() != capacity) { + return false; + } + for (int i = 0; i < capacity; i++) { + equal &= getByte(i) == other.getByte(i); + } + return equal; + } + return false; + } + + @Override + public int compareTo(ByteBuf buffer) { + var cap = Math.min(capacity(), buffer.capacity()); + for (int i = 0; i < cap; i++) { + int cmp = Byte.compare(getByte(i), buffer.getByte(i)); + if (cmp != 0) { + return cmp; + } + } + return Integer.compare(capacity(), buffer.capacity()); + } + + @Override + public String toString() { + return "ByteBuf(" + readerIndex() + ", " + writerIndex() + ", " + capacity() + ')'; + } + + @Override + public ByteBuf retain(int increment) { + for (int i = 0; i < increment; i++) { + buffer.acquire(); + } + return this; + } + + @Override + public int refCnt() { + return buffer.isAccessible()? 1 + buffer.countBorrows() : 0; + } + + @Override + public ByteBuf retain() { + return retain(1); + } + + @Override + public ByteBuf touch() { + return this; + } + + @Override + public ByteBuf touch(Object hint) { + return this; + } + + @Override + public boolean release() { + return release(1); + } + + @Override + public boolean release(int decrement) { + for (int i = 0; i < decrement; i++) { + buffer.close(); + } + return !buffer.isAccessible(); + } + + private ByteBufAdaptor wrap(Buffer copy) { + return new ByteBufAdaptor(alloc, copy); + } + + private BufferAllocator preferredBufferAllocator() { + return isDirect()? alloc.getOffHeap() : alloc.getOnHeap(); + } +} diff --git a/src/main/java/io/netty/buffer/api/adaptor/ByteBufAllocatorAdaptor.java b/src/main/java/io/netty/buffer/api/adaptor/ByteBufAllocatorAdaptor.java new file mode 100644 index 0000000..22e4d0d --- /dev/null +++ b/src/main/java/io/netty/buffer/api/adaptor/ByteBufAllocatorAdaptor.java @@ -0,0 +1,157 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer.api.adaptor; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.api.BufferAllocator; + +public class ByteBufAllocatorAdaptor implements ByteBufAllocator, AutoCloseable { + private final BufferAllocator onheap; + private final BufferAllocator offheap; + private boolean closed; + + public ByteBufAllocatorAdaptor() { + this(BufferAllocator.pooledHeap(), BufferAllocator.pooledDirect()); + } + + public ByteBufAllocatorAdaptor(BufferAllocator onheap, BufferAllocator offheap) { + this.onheap = onheap; + this.offheap = offheap; + } + + @Override + public ByteBuf buffer() { + return buffer(256); + } + + public BufferAllocator getOnHeap() { + return onheap; + } + + public BufferAllocator getOffHeap() { + return offheap; + } + + public boolean isClosed() { + return closed; + } + + @Override + public ByteBuf buffer(int initialCapacity) { + return new ByteBufAdaptor(this, onheap.allocate(initialCapacity)); + } + + @Override + public ByteBuf buffer(int initialCapacity, int maxCapacity) { + return buffer(maxCapacity); + } + + @Override + public ByteBuf ioBuffer() { + return directBuffer(); + } + + @Override + public ByteBuf ioBuffer(int initialCapacity) { + return directBuffer(initialCapacity); + } + + @Override + public ByteBuf ioBuffer(int initialCapacity, int maxCapacity) { + return directBuffer(initialCapacity, maxCapacity); + } + + @Override + public ByteBuf heapBuffer() { + return buffer(); + } + + @Override + public ByteBuf heapBuffer(int initialCapacity) { + return buffer(initialCapacity); + } + + @Override + public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) { + return buffer(initialCapacity, maxCapacity); + } + + @Override + public ByteBuf directBuffer() { + return directBuffer(256); + } + + @Override + public ByteBuf directBuffer(int initialCapacity) { + return new ByteBufAdaptor(this, offheap.allocate(initialCapacity)); + } + + @Override + public ByteBuf directBuffer(int initialCapacity, int maxCapacity) { + return directBuffer(maxCapacity); + } + + @Override + public CompositeByteBuf compositeBuffer() { + return compositeHeapBuffer(); + } + + @Override + public CompositeByteBuf compositeBuffer(int maxNumComponents) { + return compositeHeapBuffer(maxNumComponents); + } + + @Override + public CompositeByteBuf compositeHeapBuffer() { + return compositeHeapBuffer(1024); + } + + @Override + public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) { + return new CompositeByteBuf(this, false, maxNumComponents, heapBuffer()); + } + + @Override + public CompositeByteBuf compositeDirectBuffer() { + return compositeDirectBuffer(1024); + } + + @Override + public CompositeByteBuf compositeDirectBuffer(int maxNumComponents) { + return new CompositeByteBuf(this, true, maxNumComponents, directBuffer()); + } + + @Override + public boolean isDirectBufferPooled() { + return true; + } + + @Override + public int calculateNewCapacity(int minNewCapacity, int maxCapacity) { + return 0; + } + + @Override + public void close() throws Exception { + try (onheap) { + try (offheap) { + closed = true; + } + } + } +} diff --git a/src/main/java/io/netty/buffer/api/adaptor/package-info.java b/src/main/java/io/netty/buffer/api/adaptor/package-info.java new file mode 100644 index 0000000..1296415 --- /dev/null +++ b/src/main/java/io/netty/buffer/api/adaptor/package-info.java @@ -0,0 +1,20 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +/** + * Helpers for integrating with the existing {@link io.netty.buffer.ByteBuf} API. + */ +package io.netty.buffer.api.adaptor; diff --git a/src/main/java/io/netty/buffer/api/memseg/MemSegBuffer.java b/src/main/java/io/netty/buffer/api/memseg/MemSegBuffer.java index 02fc14e..f68c3fb 100644 --- a/src/main/java/io/netty/buffer/api/memseg/MemSegBuffer.java +++ b/src/main/java/io/netty/buffer/api/memseg/MemSegBuffer.java @@ -15,6 +15,7 @@ */ package io.netty.buffer.api.memseg; +import io.netty.buffer.ByteBuf; import io.netty.buffer.api.BufferAllocator; import io.netty.buffer.api.AllocatorControl; import io.netty.buffer.api.Buffer; @@ -26,6 +27,9 @@ import io.netty.buffer.api.WritableComponentProcessor; import io.netty.buffer.api.Drop; import io.netty.buffer.api.Owned; import io.netty.buffer.api.RcSupport; +import io.netty.buffer.api.adaptor.BufferIntegratable; +import io.netty.buffer.api.adaptor.ByteBufAdaptor; +import io.netty.buffer.api.adaptor.ByteBufAllocatorAdaptor; import jdk.incubator.foreign.MemorySegment; import java.nio.ByteBuffer; @@ -46,7 +50,8 @@ import static jdk.incubator.foreign.MemoryAccess.setIntAtOffset; import static jdk.incubator.foreign.MemoryAccess.setLongAtOffset; import static jdk.incubator.foreign.MemoryAccess.setShortAtOffset; -class MemSegBuffer extends RcSupport implements Buffer, ReadableComponent, WritableComponent { +class MemSegBuffer extends RcSupport implements Buffer, ReadableComponent, WritableComponent, + BufferIntegratable { private static final MemorySegment CLOSED_SEGMENT; static final Drop SEGMENT_CLOSE; @@ -1138,6 +1143,66 @@ class MemSegBuffer extends RcSupport implements Buffer, Re return new RecoverableMemory(seg, alloc); } + // + private ByteBufAdaptor adaptor; + @Override + public ByteBuf asByteBuf() { + ByteBufAdaptor bba = adaptor; + if (bba == null) { + ByteBufAllocatorAdaptor alloc = new ByteBufAllocatorAdaptor( + BufferAllocator.heap(), BufferAllocator.direct()); + return adaptor = new ByteBufAdaptor(alloc, this); + } + return bba; + } + + @Override + public int readableBytes() { + return writerOffset() - readerOffset(); + } + + @Override + public MemSegBuffer retain(int increment) { + for (int i = 0; i < increment; i++) { + acquire(); + } + return this; + } + + @Override + public int refCnt() { + return isAccessible()? 1 + countBorrows() : 0; + } + + @Override + public MemSegBuffer retain() { + return retain(1); + } + + @Override + public MemSegBuffer touch() { + return this; + } + + @Override + public MemSegBuffer touch(Object hint) { + return this; + } + + @Override + public boolean release() { + return release(1); + } + + @Override + public boolean release(int decrement) { + for (int i = 0; i < decrement; i++) { + close(); + } + return !isAccessible(); + } + // + static final class RecoverableMemory { private final MemorySegment segment; private final AllocatorControl alloc; diff --git a/src/test/java/io/netty/buffer/api/EchoIT.java b/src/test/java/io/netty/buffer/api/EchoIT.java new file mode 100644 index 0000000..67b2a13 --- /dev/null +++ b/src/test/java/io/netty/buffer/api/EchoIT.java @@ -0,0 +1,150 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer.api; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.api.adaptor.ByteBufAllocatorAdaptor; +import io.netty.buffer.api.examples.echo.EchoServerHandler; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.MultithreadEventLoopGroup; +import io.netty.channel.nio.NioHandler; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import org.junit.jupiter.api.Test; + +import java.net.InetSocketAddress; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class EchoIT { + // In this test we have a server and a client, where the server echos back anything it receives, + // and our client sends a single message to the server, and then verifies that it receives it back. + + @Test + void echoServerMustReplyWithSameData() throws Exception { + ByteBufAllocatorAdaptor allocator = new ByteBufAllocatorAdaptor(); + EventLoopGroup bossGroup = new MultithreadEventLoopGroup(1, NioHandler.newFactory()); + EventLoopGroup workerGroup = new MultithreadEventLoopGroup(NioHandler.newFactory()); + final EchoServerHandler serverHandler = new EchoServerHandler(); + try { + ServerBootstrap server = new ServerBootstrap(); + server.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .option(ChannelOption.ALLOCATOR, allocator) + .option(ChannelOption.SO_BACKLOG, 100) + .handler(new LoggingHandler(LogLevel.INFO)) + .childHandler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline p = ch.pipeline(); + p.addLast(new LoggingHandler(LogLevel.INFO)); + p.addLast(serverHandler); + } + }); + + // Start the server. + ChannelFuture bind = server.bind("localhost", 0).sync(); + InetSocketAddress serverAddress = (InetSocketAddress) bind.channel().localAddress(); + + // Configure the client. + EventLoopGroup group = new MultithreadEventLoopGroup(NioHandler.newFactory()); + try { + Bootstrap b = new Bootstrap(); + b.group(group) + .channel(NioSocketChannel.class) + .option(ChannelOption.TCP_NODELAY, true) + .handler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline p = ch.pipeline(); + p.addLast(new LoggingHandler(LogLevel.INFO)); + p.addLast(new EchoClientHandler()); + } + }); + + // Start the client. + ChannelFuture f = b.connect(serverAddress).sync(); + + // Wait until the connection is closed. + f.channel().closeFuture().sync(); + } finally { + // Shut down the event loop to terminate all threads. + group.shutdownGracefully(); + } + + // Shut down the server. + bind.channel().close().sync(); + } finally { + // Shut down all event loops to terminate all threads. + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + allocator.close(); + } + } + + static class EchoClientHandler implements ChannelHandler { + private static final int SIZE = 256; + private final Buffer firstMessage; + + /** + * Creates a client-side handler. + */ + EchoClientHandler() { + firstMessage = BufferAllocator.heap().allocate(SIZE); + for (int i = 0; i < SIZE; i ++) { + firstMessage.writeByte((byte) i); + } + } + + @Override + public void channelActive(ChannelHandlerContext ctx) { + ctx.writeAndFlush(firstMessage); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + ByteBuf buf = (ByteBuf) msg; + assertEquals(SIZE, buf.capacity()); + assertEquals(SIZE, buf.readableBytes()); + for (int i = 0; i < SIZE; i++) { + assertEquals((byte) i, buf.readByte()); + } + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) { + ctx.close(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + // Close the connection when an exception is raised. + ctx.close(); + throw new RuntimeException(cause); + } + } +} diff --git a/src/test/java/io/netty/buffer/api/examples/echo/EchoClient.java b/src/test/java/io/netty/buffer/api/examples/echo/EchoClient.java new file mode 100644 index 0000000..2b62c82 --- /dev/null +++ b/src/test/java/io/netty/buffer/api/examples/echo/EchoClient.java @@ -0,0 +1,86 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer.api.examples.echo; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.MultithreadEventLoopGroup; +import io.netty.channel.nio.NioHandler; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.util.InsecureTrustManagerFactory; + +/** + * Sends one message when a connection is open and echoes back any received + * data to the server. Simply put, the echo client initiates the ping-pong + * traffic between the echo client and server by sending the first message to + * the server. + */ +public final class EchoClient { + + static final boolean SSL = System.getProperty("ssl") != null; + static final String HOST = System.getProperty("host", "127.0.0.1"); + static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); + static final int SIZE = Integer.parseInt(System.getProperty("size", "256")); + + public static void main(String[] args) throws Exception { + // Configure SSL.git + final SslContext sslCtx; + if (SSL) { + sslCtx = SslContextBuilder.forClient() + .trustManager(InsecureTrustManagerFactory.INSTANCE).build(); + } else { + sslCtx = null; + } + + // Configure the client. + EventLoopGroup group = new MultithreadEventLoopGroup(NioHandler.newFactory()); + try { + Bootstrap b = new Bootstrap(); + b.group(group) + .channel(NioSocketChannel.class) + .option(ChannelOption.TCP_NODELAY, true) + .handler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline p = ch.pipeline(); + if (sslCtx != null) { + p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT)); + } + p.addLast(new LoggingHandler(LogLevel.INFO)); + p.addLast(new EchoClientHandler()); + } + }); + + // Start the client. + ChannelFuture f = b.connect(HOST, PORT).sync(); + + // Wait until the connection is closed. + f.channel().closeFuture().sync(); + } finally { + // Shut down the event loop to terminate all threads. + group.shutdownGracefully(); + } + } +} diff --git a/src/test/java/io/netty/buffer/api/examples/echo/EchoClientHandler.java b/src/test/java/io/netty/buffer/api/examples/echo/EchoClientHandler.java new file mode 100644 index 0000000..b66bf37 --- /dev/null +++ b/src/test/java/io/netty/buffer/api/examples/echo/EchoClientHandler.java @@ -0,0 +1,63 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer.api.examples.echo; + +import io.netty.buffer.api.Buffer; +import io.netty.buffer.api.BufferAllocator; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; + +/** + * Handler implementation for the echo client. It initiates the ping-pong + * traffic between the echo client and server by sending the first message to + * the server. + */ +public class EchoClientHandler implements ChannelHandler { + + private final Buffer firstMessage; + + /** + * Creates a client-side handler. + */ + public EchoClientHandler() { + firstMessage = BufferAllocator.heap().allocate(EchoClient.SIZE); + for (int i = 0; i < firstMessage.capacity(); i ++) { + firstMessage.writeByte((byte) i); + } + } + + @Override + public void channelActive(ChannelHandlerContext ctx) { + ctx.writeAndFlush(firstMessage); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + ctx.write(msg); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) { + ctx.flush(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + // Close the connection when an exception is raised. + cause.printStackTrace(); + ctx.close(); + } +} diff --git a/src/test/java/io/netty/buffer/api/examples/echo/EchoServer.java b/src/test/java/io/netty/buffer/api/examples/echo/EchoServer.java new file mode 100644 index 0000000..6f28e29 --- /dev/null +++ b/src/test/java/io/netty/buffer/api/examples/echo/EchoServer.java @@ -0,0 +1,89 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer.api.examples.echo; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.api.adaptor.ByteBufAllocatorAdaptor; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.MultithreadEventLoopGroup; +import io.netty.channel.nio.NioHandler; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.util.SelfSignedCertificate; + +/** + * Echoes back any received data from a client. + */ +public final class EchoServer { + + static final boolean SSL = System.getProperty("ssl") != null; + static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); + + public static void main(String[] args) throws Exception { + // Configure SSL. + final SslContext sslCtx; + if (SSL) { + SelfSignedCertificate ssc = new SelfSignedCertificate(); + sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build(); + } else { + sslCtx = null; + } + + // Configure the server. + ByteBufAllocatorAdaptor allocator = new ByteBufAllocatorAdaptor(); + EventLoopGroup bossGroup = new MultithreadEventLoopGroup(1, NioHandler.newFactory()); + EventLoopGroup workerGroup = new MultithreadEventLoopGroup(NioHandler.newFactory()); + final EchoServerHandler serverHandler = new EchoServerHandler(); + try { + ServerBootstrap b = new ServerBootstrap(); + b.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .option(ChannelOption.ALLOCATOR, allocator) + .option(ChannelOption.SO_BACKLOG, 100) + .handler(new LoggingHandler(LogLevel.INFO)) + .childHandler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline p = ch.pipeline(); + if (sslCtx != null) { + p.addLast(sslCtx.newHandler(ch.alloc())); + } + p.addLast(new LoggingHandler(LogLevel.INFO)); + p.addLast(serverHandler); + } + }); + + // Start the server. + ChannelFuture f = b.bind(PORT).sync(); + + // Wait until the server socket is closed. + f.channel().closeFuture().sync(); + } finally { + // Shut down all event loops to terminate all threads. + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + allocator.close(); + } + } +} diff --git a/src/test/java/io/netty/buffer/api/examples/echo/EchoServerHandler.java b/src/test/java/io/netty/buffer/api/examples/echo/EchoServerHandler.java new file mode 100644 index 0000000..a3a896e --- /dev/null +++ b/src/test/java/io/netty/buffer/api/examples/echo/EchoServerHandler.java @@ -0,0 +1,44 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer.api.examples.echo; + +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerContext; + +/** + * Handler implementation for the echo server. + */ +@Sharable +public class EchoServerHandler implements ChannelHandler { + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + ctx.write(msg); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) { + ctx.flush(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + // Close the connection when an exception is raised. + cause.printStackTrace(); + ctx.close(); + } +}