Merge pull request #32 from netty/bytebuf-integration

Make the incubating buffers exposable as ByteBuf
This commit is contained in:
Chris Vest 2021-03-01 11:32:16 +01:00 committed by GitHub
commit 1d73f655bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 2178 additions and 18 deletions

125
README.adoc Normal file
View File

@ -0,0 +1,125 @@
= Netty Incubator Buffer API
This repository is incubating a new buffer API proposed for Netty 5.
== Building and Testing
Short version: just run `make`.
The project currently relies on snapshot versions of the https://github.com/openjdk/panama-foreign[Panama Foreign] fork of OpenJDK.
This allows us to test out the most recent version of the `jdk.incubator.foreign` APIs, but also make building, and local development more involved.
To simplify things, we have a Docker based build, controlled via a Makefile with the following commands:
* `image` build the docker image.This includes building a snapshot of OpenJDK, and download all relevant Maven dependencies.
* `test` run all tests in a docker container.This implies `image`.The container is automatically deleted afterwards.
* `dbg` drop into a shell in the build container, without running the build itself.The debugging container is not deleted afterwards.
* `clean` remove the leftover containers created by `dbg`, `test`, and `build`.
* `build` build binaries and run all tests in a container, and copy the `target` directory out of the container afterwards.This is the default build target.
== Example: Echo Client and Server
Making use of this new buffer API on the client side is quite easy.
Even though Netty 5 does not have native support for these buffers, it is able to convert them to the old `ByteBuf` API as needed.
This means we are able to send incubator buffers through a Netty pipeline, and have it work as if we were sending `ByteBuf` instances.
[source,java]
----
public final class Client {
public static void main(String[] args) throws Exception {
EventLoopGroup group = new MultithreadEventLoopGroup(NioHandler.newFactory());
try (BufferAllocator allocator = BufferAllocator.pooledDirect()) { // <1>
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) {
Buffer message = allocator.allocate(256); // <2>
for (int i = 0; i < message.capacity(); i++) {
message.writeByte((byte) i);
}
ctx.writeAndFlush(message); // <3>
}
});
}
});
// Start the client.
ChannelFuture f = b.connect("127.0.0.1", 8007).sync();
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down the event loop to terminate all threads.
group.shutdownGracefully();
}
}
}
----
<1> A life-cycled allocator is created to wrap the scope of our application.
<2> Buffers are allocated with one of the `allocate` methods.
<3> The buffer can then be sent down the pipeline, and will be written to the socket just like a `ByteBuf` would.
[NOTE]
--
The same is not the case for `BufferHolder`.
It is not treated the same as a `ByteBufHolder`.
--
On the server size, things are more complicated because Netty itself will be allocating the buffers, and the `ByteBufAllocator` API is only capable of returning `ByteBuf` instances.
The `ByteBufAllocatorAdaptor` will allocate `ByteBuf` instances that are backed by the new buffers.
The buffers can then we extracted from the `ByteBuf` instances with the `ByteBufAdaptor.extract` method.
We can tell a Netty server how to allocate buffers by setting the `ALLOCATOR` child-channel option:
[source,java]
----
ByteBufAllocatorAdaptor allocator = new ByteBufAllocatorAdaptor(); // <1>
ServerBootstrap server = new ServerBootstrap();
server.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.ALLOCATOR, allocator) // <2>
.handler(new EchoServerHandler());
----
<1> The `ByteBufAllocatorAdaptor` implements `ByteBufAllocator`, and directly allocates `ByteBuf` instances that are backed by buffers that use the new API.
<2> To make Netty use a given allocator when allocating buffers for receiving data, we set the allocator as a child option.
With the above, we just changed how the buffers are allocated, but we haven't changed the API we use for interacting with the buffers.
The buffers are still allocated at `ByteBuf` instances, and flow through the pipeline as such.
If we want to use the new buffer API in our server handlers, we have to extract the buffers from the `ByteBuf` instances that are passed down:
[source,java]
----
import io.netty.buffer.ByteBuf;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.adaptor.ByteBufAdaptor;
@Sharable
public class EchoServerHandler implements ChannelHandler {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { // <1>
if (msg instanceof ByteBuf) { // <2>
// For this example, we only echo back buffers that are using the new buffer API.
Buffer buf = ByteBufAdaptor.extract((ByteBuf) msg); // <3>
ctx.write(buf); // <4>
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
}
----
<1> Netty pipelines are defined as transferring `Object` instances as messages.
<2> When we receive data directly from a socket, these messages will be `ByteBuf` instances with the received data.
<3> Since we set the allocator to create `ByteBuf` instances that are backed by buffers with the new API, we will be able to extract the backing `Buffer` instances.
<4> We can then operate on the extracted `Buffer` instances directly.
The `Buffer` and `ByteBuf` instances mirror each other exactly.
In this case, we just write them back to the client that sent the data to us.
The files in `src/test/java/io/netty/buffer/api/examples/echo` for the full source code to this example.

View File

@ -1,17 +0,0 @@
# Netty Incubator Buffer API
This repository is incubating a new buffer API proposed for Netty 5.
## Building and Testing
Short version: just run `make`.
The project currently relies on snapshot versions of the [Panama Foreign](https://github.com/openjdk/panama-foreign) fork of OpenJDK.
This allows us to test out the most recent version of the `jdk.incubator.foreign` APIs, but also make building, and local development more involved.
To simplify things, we have a Docker based build, controlled via a Makefile with the following commands:
* `image` build the docker image. This includes building a snapshot of OpenJDK, and download all relevant Maven dependencies.
* `test` run all tests in a docker container. This implies `image`. The container is automatically deleted afterwards.
* `dbg` drop into a shell in the build container, without running the build itself. The debugging container is not deleted afterwards.
* `clean` remove the leftover containers created by `dbg`, `test`, and `build`.
* `build` build binaries and run all tests in a container, and copy the `target` directory out of the container afterwards. This is the default build target.

View File

@ -394,6 +394,12 @@
<version>${netty.build.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
<version>${netty.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>

View File

@ -190,4 +190,9 @@ public abstract class BufferHolder<T extends BufferHolder<T>> implements Rc<T> {
protected final Buffer getBufVolatile() {
return (Buffer) BUF.getVolatile(this);
}
@Override
public boolean isAccessible() {
return buf.isAccessible();
}
}

View File

@ -87,4 +87,13 @@ public interface Rc<I extends Rc<I>> extends AutoCloseable, Deref<I> {
* @return The number of borrows, if any, of this object.
*/
int countBorrows();
/**
* Check if this object is accessible.
*
* @return {@code true} if this object is still valid and can be accessed,
* otherwise {@code false} if, for instance, this object has been dropped/deallocated,
* or been {@linkplain #send() sent} elsewhere.
*/
boolean isAccessible();
}

View File

@ -102,6 +102,11 @@ public abstract class RcSupport<I extends Rc<I>, T extends RcSupport<I, T>> impl
return Math.max(acquires, 0);
}
@Override
public boolean isAccessible() {
return acquires >= 0;
}
/**
* Prepare this instance for ownsership transfer. This method is called from {@link #send()} in the sending thread.
* This method should put this Rc in a deactivated state where it is no longer accessible from the currently owning

View File

@ -0,0 +1,25 @@
/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api.adaptor;
import io.netty.buffer.ByteBufConvertible;
import io.netty.util.ReferenceCounted;
/**
* Interfaces that are required for an object to stand-in for a {@link io.netty.buffer.ByteBuf} in Netty.
*/
public interface BufferIntegratable extends ByteBufConvertible, ReferenceCounted {
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,159 @@
/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api.adaptor;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.api.BufferAllocator;
public class ByteBufAllocatorAdaptor implements ByteBufAllocator, AutoCloseable {
private final BufferAllocator onheap;
private final BufferAllocator offheap;
private boolean closed;
public ByteBufAllocatorAdaptor() {
this(BufferAllocator.pooledHeap(), BufferAllocator.pooledDirect());
}
public ByteBufAllocatorAdaptor(BufferAllocator onheap, BufferAllocator offheap) {
this.onheap = onheap;
this.offheap = offheap;
}
@Override
public ByteBuf buffer() {
return buffer(256);
}
public BufferAllocator getOnHeap() {
return onheap;
}
public BufferAllocator getOffHeap() {
return offheap;
}
public boolean isClosed() {
return closed;
}
@Override
public ByteBuf buffer(int initialCapacity) {
return new ByteBufAdaptor(this, onheap.allocate(initialCapacity));
}
@Override
public ByteBuf buffer(int initialCapacity, int maxCapacity) {
return buffer(maxCapacity);
}
@Override
public ByteBuf ioBuffer() {
return directBuffer();
}
@Override
public ByteBuf ioBuffer(int initialCapacity) {
return directBuffer(initialCapacity);
}
@Override
public ByteBuf ioBuffer(int initialCapacity, int maxCapacity) {
return directBuffer(initialCapacity, maxCapacity);
}
@Override
public ByteBuf heapBuffer() {
return buffer();
}
@Override
public ByteBuf heapBuffer(int initialCapacity) {
return buffer(initialCapacity);
}
@Override
public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
return buffer(initialCapacity, maxCapacity);
}
@Override
public ByteBuf directBuffer() {
return directBuffer(256);
}
@Override
public ByteBuf directBuffer(int initialCapacity) {
// TODO we cannot use off-heap buffers here, until the JDK allows direct byte buffers based on native
// memory segments to be used in IO operations.
return new ByteBufAdaptor(this, onheap.allocate(initialCapacity));
}
@Override
public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
return directBuffer(maxCapacity);
}
@Override
public CompositeByteBuf compositeBuffer() {
return compositeHeapBuffer();
}
@Override
public CompositeByteBuf compositeBuffer(int maxNumComponents) {
return compositeHeapBuffer(maxNumComponents);
}
@Override
public CompositeByteBuf compositeHeapBuffer() {
return compositeHeapBuffer(1024);
}
@Override
public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) {
return new CompositeByteBuf(this, false, maxNumComponents, heapBuffer());
}
@Override
public CompositeByteBuf compositeDirectBuffer() {
return compositeDirectBuffer(1024);
}
@Override
public CompositeByteBuf compositeDirectBuffer(int maxNumComponents) {
return new CompositeByteBuf(this, true, maxNumComponents, directBuffer());
}
@Override
public boolean isDirectBufferPooled() {
return true;
}
@Override
public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
return 0;
}
@Override
public void close() throws Exception {
try (onheap) {
try (offheap) {
closed = true;
}
}
}
}

View File

@ -0,0 +1,20 @@
/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
/**
* Helpers for integrating with the existing {@link io.netty.buffer.ByteBuf} API.
*/
package io.netty.buffer.api.adaptor;

View File

@ -15,6 +15,7 @@
*/
package io.netty.buffer.api.memseg;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.api.BufferAllocator;
import io.netty.buffer.api.AllocatorControl;
import io.netty.buffer.api.Buffer;
@ -26,6 +27,9 @@ import io.netty.buffer.api.WritableComponentProcessor;
import io.netty.buffer.api.Drop;
import io.netty.buffer.api.Owned;
import io.netty.buffer.api.RcSupport;
import io.netty.buffer.api.adaptor.BufferIntegratable;
import io.netty.buffer.api.adaptor.ByteBufAdaptor;
import io.netty.buffer.api.adaptor.ByteBufAllocatorAdaptor;
import jdk.incubator.foreign.MemorySegment;
import java.nio.ByteBuffer;
@ -46,7 +50,8 @@ import static jdk.incubator.foreign.MemoryAccess.setIntAtOffset;
import static jdk.incubator.foreign.MemoryAccess.setLongAtOffset;
import static jdk.incubator.foreign.MemoryAccess.setShortAtOffset;
class MemSegBuffer extends RcSupport<Buffer, MemSegBuffer> implements Buffer, ReadableComponent, WritableComponent {
class MemSegBuffer extends RcSupport<Buffer, MemSegBuffer> implements Buffer, ReadableComponent, WritableComponent,
BufferIntegratable {
private static final MemorySegment CLOSED_SEGMENT;
static final Drop<MemSegBuffer> SEGMENT_CLOSE;
@ -1138,6 +1143,66 @@ class MemSegBuffer extends RcSupport<Buffer, MemSegBuffer> implements Buffer, Re
return new RecoverableMemory(seg, alloc);
}
// <editor-fold name="BufferIntegratable methods">
private ByteBufAdaptor adaptor;
@Override
public ByteBuf asByteBuf() {
ByteBufAdaptor bba = adaptor;
if (bba == null) {
ByteBufAllocatorAdaptor alloc = new ByteBufAllocatorAdaptor(
BufferAllocator.heap(), BufferAllocator.direct());
return adaptor = new ByteBufAdaptor(alloc, this);
}
return bba;
}
@Override
public int readableBytes() {
return writerOffset() - readerOffset();
}
@Override
public MemSegBuffer retain(int increment) {
for (int i = 0; i < increment; i++) {
acquire();
}
return this;
}
@Override
public int refCnt() {
return isAccessible()? 1 + countBorrows() : 0;
}
@Override
public MemSegBuffer retain() {
return retain(1);
}
@Override
public MemSegBuffer touch() {
return this;
}
@Override
public MemSegBuffer touch(Object hint) {
return this;
}
@Override
public boolean release() {
return release(1);
}
@Override
public boolean release(int decrement) {
for (int i = 0; i < decrement; i++) {
close();
}
return !isAccessible();
}
// </editor-fold>
static final class RecoverableMemory {
private final MemorySegment segment;
private final AllocatorControl alloc;

View File

@ -0,0 +1,150 @@
/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.api.adaptor.ByteBufAllocatorAdaptor;
import io.netty.buffer.api.examples.echo.EchoServerHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.MultithreadEventLoopGroup;
import io.netty.channel.nio.NioHandler;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.junit.jupiter.api.Test;
import java.net.InetSocketAddress;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class EchoIT {
// In this test we have a server and a client, where the server echos back anything it receives,
// and our client sends a single message to the server, and then verifies that it receives it back.
@Test
void echoServerMustReplyWithSameData() throws Exception {
ByteBufAllocatorAdaptor allocator = new ByteBufAllocatorAdaptor();
EventLoopGroup bossGroup = new MultithreadEventLoopGroup(1, NioHandler.newFactory());
EventLoopGroup workerGroup = new MultithreadEventLoopGroup(NioHandler.newFactory());
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap server = new ServerBootstrap();
server.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.ALLOCATOR, allocator)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
// Start the server.
ChannelFuture bind = server.bind("localhost", 0).sync();
InetSocketAddress serverAddress = (InetSocketAddress) bind.channel().localAddress();
// Configure the client.
EventLoopGroup group = new MultithreadEventLoopGroup(NioHandler.newFactory());
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new EchoClientHandler());
}
});
// Start the client.
ChannelFuture f = b.connect(serverAddress).sync();
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down the event loop to terminate all threads.
group.shutdownGracefully();
}
// Shut down the server.
bind.channel().close().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
allocator.close();
}
}
static class EchoClientHandler implements ChannelHandler {
private static final int SIZE = 256;
private final Buffer firstMessage;
/**
* Creates a client-side handler.
*/
EchoClientHandler() {
firstMessage = BufferAllocator.heap().allocate(SIZE);
for (int i = 0; i < SIZE; i++) {
firstMessage.writeByte((byte) i);
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.writeAndFlush(firstMessage);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf = (ByteBuf) msg;
assertEquals(SIZE, buf.capacity());
assertEquals(SIZE, buf.readableBytes());
for (int i = 0; i < SIZE; i++) {
assertEquals((byte) i, buf.readByte());
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
ctx.close();
throw new RuntimeException(cause);
}
}
}

View File

@ -0,0 +1,86 @@
/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api.examples.echo;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.MultithreadEventLoopGroup;
import io.netty.channel.nio.NioHandler;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
/**
* Sends one message when a connection is open and echoes back any received
* data to the server. Simply put, the echo client initiates the ping-pong
* traffic between the echo client and server by sending the first message to
* the server.
*/
public final class EchoClient {
static final boolean SSL = System.getProperty("ssl") != null;
static final String HOST = System.getProperty("host", "127.0.0.1");
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));
public static void main(String[] args) throws Exception {
// Configure SSL.git
final SslContext sslCtx;
if (SSL) {
sslCtx = SslContextBuilder.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE).build();
} else {
sslCtx = null;
}
// Configure the client.
EventLoopGroup group = new MultithreadEventLoopGroup(NioHandler.newFactory());
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
}
p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new EchoClientHandler());
}
});
// Start the client.
ChannelFuture f = b.connect(HOST, PORT).sync();
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down the event loop to terminate all threads.
group.shutdownGracefully();
}
}
}

View File

@ -0,0 +1,63 @@
/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api.examples.echo;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.BufferAllocator;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
/**
* Handler implementation for the echo client. It initiates the ping-pong
* traffic between the echo client and server by sending the first message to
* the server.
*/
public class EchoClientHandler implements ChannelHandler {
private final Buffer firstMessage;
/**
* Creates a client-side handler.
*/
public EchoClientHandler() {
firstMessage = BufferAllocator.heap().allocate(EchoClient.SIZE);
for (int i = 0; i < firstMessage.capacity(); i ++) {
firstMessage.writeByte((byte) i);
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.writeAndFlush(firstMessage);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}

View File

@ -0,0 +1,89 @@
/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api.examples.echo;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.api.adaptor.ByteBufAllocatorAdaptor;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.MultithreadEventLoopGroup;
import io.netty.channel.nio.NioHandler;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.SelfSignedCertificate;
/**
* Echoes back any received data from a client.
*/
public final class EchoServer {
static final boolean SSL = System.getProperty("ssl") != null;
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
// Configure SSL.
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
} else {
sslCtx = null;
}
// Configure the server.
ByteBufAllocatorAdaptor allocator = new ByteBufAllocatorAdaptor();
EventLoopGroup bossGroup = new MultithreadEventLoopGroup(1, NioHandler.newFactory());
EventLoopGroup workerGroup = new MultithreadEventLoopGroup(NioHandler.newFactory());
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.ALLOCATOR, allocator)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
allocator.close();
}
}
}

View File

@ -0,0 +1,50 @@
/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api.examples.echo;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.adaptor.ByteBufAdaptor;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
/**
* Handler implementation for the echo server.
*/
@Sharable
public class EchoServerHandler implements ChannelHandler {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof ByteBuf) {
// For this example, we only echo back buffers that are using the new buffer API.
Buffer buf = ByteBufAdaptor.extract((ByteBuf) msg);
ctx.write(buf);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}