SCTP: Reduce object allocation overhead and fix receive buffer allocation
There are a couple of changes here all related to making the SCTP transport less garbage-heavy: - Remove the SctpNotificationEvent and just passes along the JDK NIO Notification, as passing the Notification and always null inside a wrapped object seemed a little bit superfluous - Apply @trustin's changes to receive buffer allocation to SCTP transport, and also makes the SCTP transport use the configured buffer allocator rather than always creating a direct buffer (which seems like a bug)
This commit is contained in:
parent
0d9aecbbc1
commit
f0ad079737
@ -1,105 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright 2011 The Netty Project
|
|
||||||
*
|
|
||||||
* The Netty Project licenses this file to you under the Apache License,
|
|
||||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at:
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
* License for the specific language governing permissions and limitations
|
|
||||||
* under the License.
|
|
||||||
*/
|
|
||||||
package io.netty.channel.sctp;
|
|
||||||
|
|
||||||
import com.sun.nio.sctp.Notification;
|
|
||||||
import io.netty.channel.ChannelHandler;
|
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
|
||||||
import io.netty.channel.ChannelPipeline;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A Notification event which carries a {@link Notification} from the SCTP stack to a SCTP {@link ChannelPipeline}.
|
|
||||||
* <p>
|
|
||||||
* Following notifications may be supported by a {@link SctpChannel};
|
|
||||||
* AssociationChangeNotification, PeerAddressChangeNotification, SendFailedNotification, ShutdownNotification and
|
|
||||||
* additional implementation specific notifications.
|
|
||||||
*</p>
|
|
||||||
*
|
|
||||||
* <p>
|
|
||||||
* User can handle the notification events of a {@link SctpChannel} by override the following method
|
|
||||||
* {@link ChannelHandler#userEventTriggered(ChannelHandlerContext, Object)}.
|
|
||||||
* </p>
|
|
||||||
*/
|
|
||||||
public final class SctpNotificationEvent {
|
|
||||||
private final Notification notification;
|
|
||||||
private final Object attachment;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create a new instance
|
|
||||||
*
|
|
||||||
* @param notification the {@link Notification} which triggered this event
|
|
||||||
* @param attachment the attachment or {@code null} if non is attached.
|
|
||||||
*/
|
|
||||||
public SctpNotificationEvent(Notification notification, Object attachment) {
|
|
||||||
if (notification == null) {
|
|
||||||
throw new NullPointerException("notification");
|
|
||||||
}
|
|
||||||
this.notification = notification;
|
|
||||||
this.attachment = attachment;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Return the {@link Notification}
|
|
||||||
*/
|
|
||||||
public Notification notification() {
|
|
||||||
return notification;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Return the attachment of this {@link SctpNotificationEvent}, or
|
|
||||||
* {@code null} if no attachment was provided
|
|
||||||
*/
|
|
||||||
public Object attachment() {
|
|
||||||
return attachment;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean equals(Object o) {
|
|
||||||
if (this == o) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
if (o == null || getClass() != o.getClass()) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
SctpNotificationEvent that = (SctpNotificationEvent) o;
|
|
||||||
|
|
||||||
if (!attachment.equals(that.attachment)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!notification.equals(that.notification)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int hashCode() {
|
|
||||||
int result = notification.hashCode();
|
|
||||||
result = 31 * result + attachment.hashCode();
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
return "SctpNotification{" +
|
|
||||||
"notification=" + notification +
|
|
||||||
", attachment=" + attachment +
|
|
||||||
'}';
|
|
||||||
}
|
|
||||||
}
|
|
@ -28,7 +28,7 @@ import io.netty.channel.ChannelPipeline;
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* {@link AbstractNotificationHandler} implementation which will handle all {@link Notification}s by trigger a
|
* {@link AbstractNotificationHandler} implementation which will handle all {@link Notification}s by trigger a
|
||||||
* {@link SctpNotificationEvent} in the {@link ChannelPipeline} of a {@link SctpChannel}.
|
* {@link Notification} user event in the {@link ChannelPipeline} of a {@link SctpChannel}.
|
||||||
*/
|
*/
|
||||||
public final class SctpNotificationHandler extends AbstractNotificationHandler<Object> {
|
public final class SctpNotificationHandler extends AbstractNotificationHandler<Object> {
|
||||||
|
|
||||||
@ -43,31 +43,31 @@ public final class SctpNotificationHandler extends AbstractNotificationHandler<O
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public HandlerResult handleNotification(AssociationChangeNotification notification, Object o) {
|
public HandlerResult handleNotification(AssociationChangeNotification notification, Object o) {
|
||||||
fireEvent(notification, o);
|
fireEvent(notification);
|
||||||
return HandlerResult.CONTINUE;
|
return HandlerResult.CONTINUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public HandlerResult handleNotification(PeerAddressChangeNotification notification, Object o) {
|
public HandlerResult handleNotification(PeerAddressChangeNotification notification, Object o) {
|
||||||
fireEvent(notification, o);
|
fireEvent(notification);
|
||||||
return HandlerResult.CONTINUE;
|
return HandlerResult.CONTINUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public HandlerResult handleNotification(SendFailedNotification notification, Object o) {
|
public HandlerResult handleNotification(SendFailedNotification notification, Object o) {
|
||||||
fireEvent(notification, o);
|
fireEvent(notification);
|
||||||
return HandlerResult.CONTINUE;
|
return HandlerResult.CONTINUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public HandlerResult handleNotification(ShutdownNotification notification, Object o) {
|
public HandlerResult handleNotification(ShutdownNotification notification, Object o) {
|
||||||
fireEvent(notification, o);
|
fireEvent(notification);
|
||||||
sctpChannel.close();
|
sctpChannel.close();
|
||||||
return HandlerResult.RETURN;
|
return HandlerResult.RETURN;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void fireEvent(Notification notification, Object o) {
|
private void fireEvent(Notification notification) {
|
||||||
sctpChannel.pipeline().fireUserEventTriggered(new SctpNotificationEvent(notification, o));
|
sctpChannel.pipeline().fireUserEventTriggered(notification);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -26,6 +26,7 @@ import io.netty.channel.ChannelFuture;
|
|||||||
import io.netty.channel.ChannelMetadata;
|
import io.netty.channel.ChannelMetadata;
|
||||||
import io.netty.channel.ChannelPromise;
|
import io.netty.channel.ChannelPromise;
|
||||||
import io.netty.channel.MessageList;
|
import io.netty.channel.MessageList;
|
||||||
|
import io.netty.channel.RecvByteBufAllocator;
|
||||||
import io.netty.channel.nio.AbstractNioMessageChannel;
|
import io.netty.channel.nio.AbstractNioMessageChannel;
|
||||||
import io.netty.channel.sctp.DefaultSctpChannelConfig;
|
import io.netty.channel.sctp.DefaultSctpChannelConfig;
|
||||||
import io.netty.channel.sctp.SctpChannelConfig;
|
import io.netty.channel.sctp.SctpChannelConfig;
|
||||||
@ -64,6 +65,8 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett
|
|||||||
|
|
||||||
private final NotificationHandler<?> notificationHandler;
|
private final NotificationHandler<?> notificationHandler;
|
||||||
|
|
||||||
|
private RecvByteBufAllocator.Handle allocHandle;
|
||||||
|
|
||||||
private static SctpChannel newSctpChannel() {
|
private static SctpChannel newSctpChannel() {
|
||||||
try {
|
try {
|
||||||
return SctpChannel.open();
|
return SctpChannel.open();
|
||||||
@ -260,7 +263,12 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett
|
|||||||
@Override
|
@Override
|
||||||
protected int doReadMessages(MessageList<Object> buf) throws Exception {
|
protected int doReadMessages(MessageList<Object> buf) throws Exception {
|
||||||
SctpChannel ch = javaChannel();
|
SctpChannel ch = javaChannel();
|
||||||
ByteBuf buffer = alloc().directBuffer(config().getReceiveBufferSize());
|
|
||||||
|
RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
|
||||||
|
if (allocHandle == null) {
|
||||||
|
this.allocHandle = allocHandle = config().getRecvByteBufAllocator().newHandle();
|
||||||
|
}
|
||||||
|
ByteBuf buffer = allocHandle.allocate(config().getAllocator());
|
||||||
boolean free = true;
|
boolean free = true;
|
||||||
try {
|
try {
|
||||||
ByteBuffer data = buffer.nioBuffer(buffer.writerIndex(), buffer.writableBytes());
|
ByteBuffer data = buffer.nioBuffer(buffer.writerIndex(), buffer.writableBytes());
|
||||||
@ -277,6 +285,8 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett
|
|||||||
PlatformDependent.throwException(cause);
|
PlatformDependent.throwException(cause);
|
||||||
return -1;
|
return -1;
|
||||||
} finally {
|
} finally {
|
||||||
|
int bytesRead = buffer.readableBytes();
|
||||||
|
allocHandle.record(bytesRead);
|
||||||
if (free) {
|
if (free) {
|
||||||
buffer.release();
|
buffer.release();
|
||||||
}
|
}
|
||||||
|
@ -26,6 +26,7 @@ import io.netty.channel.ChannelFuture;
|
|||||||
import io.netty.channel.ChannelMetadata;
|
import io.netty.channel.ChannelMetadata;
|
||||||
import io.netty.channel.ChannelPromise;
|
import io.netty.channel.ChannelPromise;
|
||||||
import io.netty.channel.MessageList;
|
import io.netty.channel.MessageList;
|
||||||
|
import io.netty.channel.RecvByteBufAllocator;
|
||||||
import io.netty.channel.oio.AbstractOioMessageChannel;
|
import io.netty.channel.oio.AbstractOioMessageChannel;
|
||||||
import io.netty.channel.sctp.DefaultSctpChannelConfig;
|
import io.netty.channel.sctp.DefaultSctpChannelConfig;
|
||||||
import io.netty.channel.sctp.SctpChannelConfig;
|
import io.netty.channel.sctp.SctpChannelConfig;
|
||||||
@ -72,6 +73,8 @@ public class OioSctpChannel extends AbstractOioMessageChannel
|
|||||||
|
|
||||||
private final NotificationHandler<?> notificationHandler;
|
private final NotificationHandler<?> notificationHandler;
|
||||||
|
|
||||||
|
private RecvByteBufAllocator.Handle allocHandle;
|
||||||
|
|
||||||
private static SctpChannel openChannel() {
|
private static SctpChannel openChannel() {
|
||||||
try {
|
try {
|
||||||
return SctpChannel.open();
|
return SctpChannel.open();
|
||||||
@ -182,7 +185,11 @@ public class OioSctpChannel extends AbstractOioMessageChannel
|
|||||||
Set<SelectionKey> reableKeys = readSelector.selectedKeys();
|
Set<SelectionKey> reableKeys = readSelector.selectedKeys();
|
||||||
try {
|
try {
|
||||||
for (SelectionKey ignored : reableKeys) {
|
for (SelectionKey ignored : reableKeys) {
|
||||||
ByteBuf buffer = alloc().directBuffer(config().getReceiveBufferSize());
|
RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
|
||||||
|
if (allocHandle == null) {
|
||||||
|
this.allocHandle = allocHandle = config().getRecvByteBufAllocator().newHandle();
|
||||||
|
}
|
||||||
|
ByteBuf buffer = allocHandle.allocate(config().getAllocator());
|
||||||
boolean free = true;
|
boolean free = true;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
@ -199,6 +206,8 @@ public class OioSctpChannel extends AbstractOioMessageChannel
|
|||||||
} catch (Throwable cause) {
|
} catch (Throwable cause) {
|
||||||
PlatformDependent.throwException(cause);
|
PlatformDependent.throwException(cause);
|
||||||
} finally {
|
} finally {
|
||||||
|
int bytesRead = buffer.readableBytes();
|
||||||
|
allocHandle.record(bytesRead);
|
||||||
if (free) {
|
if (free) {
|
||||||
buffer.release();
|
buffer.release();
|
||||||
}
|
}
|
||||||
|
@ -17,7 +17,6 @@ package io.netty.channel.sctp.oio;
|
|||||||
|
|
||||||
import com.sun.nio.sctp.SctpChannel;
|
import com.sun.nio.sctp.SctpChannel;
|
||||||
import com.sun.nio.sctp.SctpServerChannel;
|
import com.sun.nio.sctp.SctpServerChannel;
|
||||||
import io.netty.buffer.ByteBufUtil;
|
|
||||||
import io.netty.channel.ChannelException;
|
import io.netty.channel.ChannelException;
|
||||||
import io.netty.channel.ChannelFuture;
|
import io.netty.channel.ChannelFuture;
|
||||||
import io.netty.channel.ChannelMetadata;
|
import io.netty.channel.ChannelMetadata;
|
||||||
@ -26,6 +25,7 @@ import io.netty.channel.MessageList;
|
|||||||
import io.netty.channel.oio.AbstractOioMessageChannel;
|
import io.netty.channel.oio.AbstractOioMessageChannel;
|
||||||
import io.netty.channel.sctp.DefaultSctpServerChannelConfig;
|
import io.netty.channel.sctp.DefaultSctpServerChannelConfig;
|
||||||
import io.netty.channel.sctp.SctpServerChannelConfig;
|
import io.netty.channel.sctp.SctpServerChannelConfig;
|
||||||
|
import io.netty.util.ReferenceCountUtil;
|
||||||
import io.netty.util.internal.logging.InternalLogger;
|
import io.netty.util.internal.logging.InternalLogger;
|
||||||
import io.netty.util.internal.logging.InternalLoggerFactory;
|
import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||||
|
|
||||||
@ -299,7 +299,7 @@ public class OioSctpServerChannel extends AbstractOioMessageChannel
|
|||||||
protected int doWrite(MessageList<Object> msgs, int index) throws Exception {
|
protected int doWrite(MessageList<Object> msgs, int index) throws Exception {
|
||||||
int size = msgs.size();
|
int size = msgs.size();
|
||||||
for (int i = index; i < size; i ++) {
|
for (int i = index; i < size; i ++) {
|
||||||
ByteBufUtil.release(msgs.get(i));
|
ReferenceCountUtil.release(msgs.get(i));
|
||||||
}
|
}
|
||||||
throw new UnsupportedOperationException();
|
throw new UnsupportedOperationException();
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user