Make the incubating buffers exposable as ByteBuf

Motivation:
This makes it possible to use the new buffer API in Netty as is.

Modification:
Make the MemSegBuffer implementation class implement AsByteBuf and ReferenceCounted.
The produced ByteBuf instance delegates all calls to the underlying Buffer instance as faithfully as possible.
One area where the two deviates, is that it's not possible to create non-retained duplicates and slices with the new buffer API.

Result:
It is now possible to use the new buffer API on both client and server side.
The Echo* examples demonstrate this, and the EchoIT proves it with a test.
The API is used more directly on the client side, since the server-side allocator in Netty does not know how to allocate buffers with the incubating API.
This commit is contained in:
Chris Vest 2021-02-17 13:54:11 +01:00
parent f14a77961c
commit 1b65bf9a23
14 changed files with 2023 additions and 1 deletions

View File

@ -394,6 +394,12 @@
<version>${netty.build.version}</version> <version>${netty.build.version}</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
<version>${netty.version}</version>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>org.openjdk.jmh</groupId> <groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId> <artifactId>jmh-core</artifactId>

View File

@ -190,4 +190,9 @@ public abstract class BufferHolder<T extends BufferHolder<T>> implements Rc<T> {
protected final Buffer getBufVolatile() { protected final Buffer getBufVolatile() {
return (Buffer) BUF.getVolatile(this); return (Buffer) BUF.getVolatile(this);
} }
@Override
public boolean isAccessible() {
return buf.isAccessible();
}
} }

View File

@ -87,4 +87,13 @@ public interface Rc<I extends Rc<I>> extends AutoCloseable, Deref<I> {
* @return The number of borrows, if any, of this object. * @return The number of borrows, if any, of this object.
*/ */
int countBorrows(); int countBorrows();
/**
* Check if this object is accessible.
*
* @return {@code true} if this object is still valid and can be accessed,
* otherwise {@code false} if, for instance, this object has been dropped/deallocated,
* or been {@linkplain #send() sent} elsewhere.
*/
boolean isAccessible();
} }

View File

@ -102,6 +102,11 @@ public abstract class RcSupport<I extends Rc<I>, T extends RcSupport<I, T>> impl
return Math.max(acquires, 0); return Math.max(acquires, 0);
} }
@Override
public boolean isAccessible() {
return acquires >= 0;
}
/** /**
* Prepare this instance for ownsership transfer. This method is called from {@link #send()} in the sending thread. * Prepare this instance for ownsership transfer. This method is called from {@link #send()} in the sending thread.
* This method should put this Rc in a deactivated state where it is no longer accessible from the currently owning * This method should put this Rc in a deactivated state where it is no longer accessible from the currently owning

View File

@ -0,0 +1,25 @@
/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api.adaptor;
import io.netty.buffer.ByteBufConvertible;
import io.netty.util.ReferenceCounted;
/**
* Interfaces that are required for an object to stand-in for a {@link io.netty.buffer.ByteBuf} in Netty.
*/
public interface BufferIntegratable extends ByteBufConvertible, ReferenceCounted {
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,157 @@
/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api.adaptor;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.api.BufferAllocator;
public class ByteBufAllocatorAdaptor implements ByteBufAllocator, AutoCloseable {
private final BufferAllocator onheap;
private final BufferAllocator offheap;
private boolean closed;
public ByteBufAllocatorAdaptor() {
this(BufferAllocator.pooledHeap(), BufferAllocator.pooledDirect());
}
public ByteBufAllocatorAdaptor(BufferAllocator onheap, BufferAllocator offheap) {
this.onheap = onheap;
this.offheap = offheap;
}
@Override
public ByteBuf buffer() {
return buffer(256);
}
public BufferAllocator getOnHeap() {
return onheap;
}
public BufferAllocator getOffHeap() {
return offheap;
}
public boolean isClosed() {
return closed;
}
@Override
public ByteBuf buffer(int initialCapacity) {
return new ByteBufAdaptor(this, onheap.allocate(initialCapacity));
}
@Override
public ByteBuf buffer(int initialCapacity, int maxCapacity) {
return buffer(maxCapacity);
}
@Override
public ByteBuf ioBuffer() {
return directBuffer();
}
@Override
public ByteBuf ioBuffer(int initialCapacity) {
return directBuffer(initialCapacity);
}
@Override
public ByteBuf ioBuffer(int initialCapacity, int maxCapacity) {
return directBuffer(initialCapacity, maxCapacity);
}
@Override
public ByteBuf heapBuffer() {
return buffer();
}
@Override
public ByteBuf heapBuffer(int initialCapacity) {
return buffer(initialCapacity);
}
@Override
public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
return buffer(initialCapacity, maxCapacity);
}
@Override
public ByteBuf directBuffer() {
return directBuffer(256);
}
@Override
public ByteBuf directBuffer(int initialCapacity) {
return new ByteBufAdaptor(this, offheap.allocate(initialCapacity));
}
@Override
public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
return directBuffer(maxCapacity);
}
@Override
public CompositeByteBuf compositeBuffer() {
return compositeHeapBuffer();
}
@Override
public CompositeByteBuf compositeBuffer(int maxNumComponents) {
return compositeHeapBuffer(maxNumComponents);
}
@Override
public CompositeByteBuf compositeHeapBuffer() {
return compositeHeapBuffer(1024);
}
@Override
public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) {
return new CompositeByteBuf(this, false, maxNumComponents, heapBuffer());
}
@Override
public CompositeByteBuf compositeDirectBuffer() {
return compositeDirectBuffer(1024);
}
@Override
public CompositeByteBuf compositeDirectBuffer(int maxNumComponents) {
return new CompositeByteBuf(this, true, maxNumComponents, directBuffer());
}
@Override
public boolean isDirectBufferPooled() {
return true;
}
@Override
public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
return 0;
}
@Override
public void close() throws Exception {
try (onheap) {
try (offheap) {
closed = true;
}
}
}
}

View File

@ -0,0 +1,20 @@
/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
/**
* Helpers for integrating with the existing {@link io.netty.buffer.ByteBuf} API.
*/
package io.netty.buffer.api.adaptor;

View File

@ -15,6 +15,7 @@
*/ */
package io.netty.buffer.api.memseg; package io.netty.buffer.api.memseg;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.api.BufferAllocator; import io.netty.buffer.api.BufferAllocator;
import io.netty.buffer.api.AllocatorControl; import io.netty.buffer.api.AllocatorControl;
import io.netty.buffer.api.Buffer; import io.netty.buffer.api.Buffer;
@ -26,6 +27,9 @@ import io.netty.buffer.api.WritableComponentProcessor;
import io.netty.buffer.api.Drop; import io.netty.buffer.api.Drop;
import io.netty.buffer.api.Owned; import io.netty.buffer.api.Owned;
import io.netty.buffer.api.RcSupport; import io.netty.buffer.api.RcSupport;
import io.netty.buffer.api.adaptor.BufferIntegratable;
import io.netty.buffer.api.adaptor.ByteBufAdaptor;
import io.netty.buffer.api.adaptor.ByteBufAllocatorAdaptor;
import jdk.incubator.foreign.MemorySegment; import jdk.incubator.foreign.MemorySegment;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -46,7 +50,8 @@ import static jdk.incubator.foreign.MemoryAccess.setIntAtOffset;
import static jdk.incubator.foreign.MemoryAccess.setLongAtOffset; import static jdk.incubator.foreign.MemoryAccess.setLongAtOffset;
import static jdk.incubator.foreign.MemoryAccess.setShortAtOffset; import static jdk.incubator.foreign.MemoryAccess.setShortAtOffset;
class MemSegBuffer extends RcSupport<Buffer, MemSegBuffer> implements Buffer, ReadableComponent, WritableComponent { class MemSegBuffer extends RcSupport<Buffer, MemSegBuffer> implements Buffer, ReadableComponent, WritableComponent,
BufferIntegratable {
private static final MemorySegment CLOSED_SEGMENT; private static final MemorySegment CLOSED_SEGMENT;
static final Drop<MemSegBuffer> SEGMENT_CLOSE; static final Drop<MemSegBuffer> SEGMENT_CLOSE;
@ -1138,6 +1143,66 @@ class MemSegBuffer extends RcSupport<Buffer, MemSegBuffer> implements Buffer, Re
return new RecoverableMemory(seg, alloc); return new RecoverableMemory(seg, alloc);
} }
// <editor-fold name="BufferIntegratable methods">
private ByteBufAdaptor adaptor;
@Override
public ByteBuf asByteBuf() {
ByteBufAdaptor bba = adaptor;
if (bba == null) {
ByteBufAllocatorAdaptor alloc = new ByteBufAllocatorAdaptor(
BufferAllocator.heap(), BufferAllocator.direct());
return adaptor = new ByteBufAdaptor(alloc, this);
}
return bba;
}
@Override
public int readableBytes() {
return writerOffset() - readerOffset();
}
@Override
public MemSegBuffer retain(int increment) {
for (int i = 0; i < increment; i++) {
acquire();
}
return this;
}
@Override
public int refCnt() {
return isAccessible()? 1 + countBorrows() : 0;
}
@Override
public MemSegBuffer retain() {
return retain(1);
}
@Override
public MemSegBuffer touch() {
return this;
}
@Override
public MemSegBuffer touch(Object hint) {
return this;
}
@Override
public boolean release() {
return release(1);
}
@Override
public boolean release(int decrement) {
for (int i = 0; i < decrement; i++) {
close();
}
return !isAccessible();
}
// </editor-fold>
static final class RecoverableMemory { static final class RecoverableMemory {
private final MemorySegment segment; private final MemorySegment segment;
private final AllocatorControl alloc; private final AllocatorControl alloc;

View File

@ -0,0 +1,150 @@
/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.api.adaptor.ByteBufAllocatorAdaptor;
import io.netty.buffer.api.examples.echo.EchoServerHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.MultithreadEventLoopGroup;
import io.netty.channel.nio.NioHandler;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.junit.jupiter.api.Test;
import java.net.InetSocketAddress;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class EchoIT {
// In this test we have a server and a client, where the server echos back anything it receives,
// and our client sends a single message to the server, and then verifies that it receives it back.
@Test
void echoServerMustReplyWithSameData() throws Exception {
ByteBufAllocatorAdaptor allocator = new ByteBufAllocatorAdaptor();
EventLoopGroup bossGroup = new MultithreadEventLoopGroup(1, NioHandler.newFactory());
EventLoopGroup workerGroup = new MultithreadEventLoopGroup(NioHandler.newFactory());
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap server = new ServerBootstrap();
server.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.ALLOCATOR, allocator)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
// Start the server.
ChannelFuture bind = server.bind("localhost", 0).sync();
InetSocketAddress serverAddress = (InetSocketAddress) bind.channel().localAddress();
// Configure the client.
EventLoopGroup group = new MultithreadEventLoopGroup(NioHandler.newFactory());
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new EchoClientHandler());
}
});
// Start the client.
ChannelFuture f = b.connect(serverAddress).sync();
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down the event loop to terminate all threads.
group.shutdownGracefully();
}
// Shut down the server.
bind.channel().close().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
allocator.close();
}
}
static class EchoClientHandler implements ChannelHandler {
private static final int SIZE = 256;
private final Buffer firstMessage;
/**
* Creates a client-side handler.
*/
EchoClientHandler() {
firstMessage = BufferAllocator.heap().allocate(SIZE);
for (int i = 0; i < SIZE; i ++) {
firstMessage.writeByte((byte) i);
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.writeAndFlush(firstMessage);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf = (ByteBuf) msg;
assertEquals(SIZE, buf.capacity());
assertEquals(SIZE, buf.readableBytes());
for (int i = 0; i < SIZE; i++) {
assertEquals((byte) i, buf.readByte());
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
ctx.close();
throw new RuntimeException(cause);
}
}
}

View File

@ -0,0 +1,86 @@
/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api.examples.echo;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.MultithreadEventLoopGroup;
import io.netty.channel.nio.NioHandler;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
/**
* Sends one message when a connection is open and echoes back any received
* data to the server. Simply put, the echo client initiates the ping-pong
* traffic between the echo client and server by sending the first message to
* the server.
*/
public final class EchoClient {
static final boolean SSL = System.getProperty("ssl") != null;
static final String HOST = System.getProperty("host", "127.0.0.1");
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));
public static void main(String[] args) throws Exception {
// Configure SSL.git
final SslContext sslCtx;
if (SSL) {
sslCtx = SslContextBuilder.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE).build();
} else {
sslCtx = null;
}
// Configure the client.
EventLoopGroup group = new MultithreadEventLoopGroup(NioHandler.newFactory());
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
}
p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new EchoClientHandler());
}
});
// Start the client.
ChannelFuture f = b.connect(HOST, PORT).sync();
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down the event loop to terminate all threads.
group.shutdownGracefully();
}
}
}

View File

@ -0,0 +1,63 @@
/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api.examples.echo;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.BufferAllocator;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
/**
* Handler implementation for the echo client. It initiates the ping-pong
* traffic between the echo client and server by sending the first message to
* the server.
*/
public class EchoClientHandler implements ChannelHandler {
private final Buffer firstMessage;
/**
* Creates a client-side handler.
*/
public EchoClientHandler() {
firstMessage = BufferAllocator.heap().allocate(EchoClient.SIZE);
for (int i = 0; i < firstMessage.capacity(); i ++) {
firstMessage.writeByte((byte) i);
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.writeAndFlush(firstMessage);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}

View File

@ -0,0 +1,89 @@
/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api.examples.echo;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.api.adaptor.ByteBufAllocatorAdaptor;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.MultithreadEventLoopGroup;
import io.netty.channel.nio.NioHandler;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.SelfSignedCertificate;
/**
* Echoes back any received data from a client.
*/
public final class EchoServer {
static final boolean SSL = System.getProperty("ssl") != null;
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
// Configure SSL.
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
} else {
sslCtx = null;
}
// Configure the server.
ByteBufAllocatorAdaptor allocator = new ByteBufAllocatorAdaptor();
EventLoopGroup bossGroup = new MultithreadEventLoopGroup(1, NioHandler.newFactory());
EventLoopGroup workerGroup = new MultithreadEventLoopGroup(NioHandler.newFactory());
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.ALLOCATOR, allocator)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
allocator.close();
}
}
}

View File

@ -0,0 +1,44 @@
/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api.examples.echo;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
/**
* Handler implementation for the echo server.
*/
@Sharable
public class EchoServerHandler implements ChannelHandler {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}