cf171ff525
Motiviation: The current read loops don't fascilitate reading a maximum amount of bytes. This capability is useful to have more fine grain control over how much data is injested. Modifications: - Add a setMaxBytesPerRead(int) and getMaxBytesPerRead() to ChannelConfig - Add a setMaxBytesPerIndividualRead(int) and getMaxBytesPerIndividualRead to ChannelConfig - Add methods to RecvByteBufAllocator so that a pluggable scheme can be used to control the behavior of the read loop. - Modify read loop for all transport types to respect the new RecvByteBufAllocator API Result: The ability to control how many bytes are read for each read operation/loop, and a more extensible read loop.
124 lines
3.8 KiB
Java
124 lines
3.8 KiB
Java
/*
|
|
* Copyright 2015 The Netty Project
|
|
*
|
|
* The Netty Project licenses this file to you under the Apache License,
|
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
|
* with the License. You may obtain a copy of the License at:
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
* License for the specific language governing permissions and limitations
|
|
* under the License.
|
|
*/
|
|
package io.netty.channel;
|
|
|
|
import io.netty.buffer.ByteBuf;
|
|
import io.netty.buffer.ByteBufAllocator;
|
|
|
|
/**
|
|
* Default implementation of {@link MaxMessagesRecvByteBufAllocator} which respects {@link ChannelConfig#isAutoRead()}
|
|
* and also prevents overflow.
|
|
*/
|
|
public abstract class DefaultMaxMessagesRecvByteBufAllocator implements MaxMessagesRecvByteBufAllocator {
|
|
private volatile int maxMessagesPerRead;
|
|
|
|
public DefaultMaxMessagesRecvByteBufAllocator() {
|
|
this(1);
|
|
}
|
|
|
|
public DefaultMaxMessagesRecvByteBufAllocator(int maxMessagesPerRead) {
|
|
maxMessagesPerRead(maxMessagesPerRead);
|
|
}
|
|
|
|
@Override
|
|
public int maxMessagesPerRead() {
|
|
return maxMessagesPerRead;
|
|
}
|
|
|
|
@Override
|
|
public MaxMessagesRecvByteBufAllocator maxMessagesPerRead(int maxMessagesPerRead) {
|
|
if (maxMessagesPerRead <= 0) {
|
|
throw new IllegalArgumentException("maxMessagesPerRead: " + maxMessagesPerRead + " (expected: > 0)");
|
|
}
|
|
this.maxMessagesPerRead = maxMessagesPerRead;
|
|
return this;
|
|
}
|
|
|
|
/**
|
|
* Focuses on enforcing the maximum messages per read condition for {@link #continueReading()}.
|
|
*/
|
|
public abstract class MaxMessageHandle implements Handle {
|
|
private ChannelConfig config;
|
|
private int maxMessagePerRead;
|
|
private int totalMessages;
|
|
private int totalBytesRead;
|
|
private int attemptedBytesRead;
|
|
private int lastBytesRead;
|
|
|
|
/**
|
|
* Only {@link ChannelConfig#getMaxMessagesPerRead()} is used.
|
|
*/
|
|
@Override
|
|
public void reset(ChannelConfig config) {
|
|
this.config = config;
|
|
maxMessagePerRead = maxMessagesPerRead();
|
|
totalMessages = totalBytesRead = 0;
|
|
}
|
|
|
|
@Override
|
|
public ByteBuf allocate(ByteBufAllocator alloc) {
|
|
return alloc.ioBuffer(guess());
|
|
}
|
|
|
|
@Override
|
|
public final void incMessagesRead(int amt) {
|
|
totalMessages += amt;
|
|
}
|
|
|
|
@Override
|
|
public final void lastBytesRead(int bytes) {
|
|
lastBytesRead = bytes;
|
|
// Ignore if bytes is negative, the interface contract states it will be detected externally after call.
|
|
// The value may be "invalid" after this point, but it doesn't matter because reading will be stopped.
|
|
totalBytesRead += bytes;
|
|
if (totalBytesRead < 0) {
|
|
totalBytesRead = Integer.MAX_VALUE;
|
|
}
|
|
}
|
|
|
|
@Override
|
|
public final int lastBytesRead() {
|
|
return lastBytesRead;
|
|
}
|
|
|
|
@Override
|
|
public boolean continueReading() {
|
|
return config.isAutoRead() &&
|
|
attemptedBytesRead == lastBytesRead &&
|
|
totalMessages < maxMessagePerRead &&
|
|
totalBytesRead < Integer.MAX_VALUE;
|
|
}
|
|
|
|
@Override
|
|
public void readComplete() {
|
|
}
|
|
|
|
@Override
|
|
public int attemptedBytesRead() {
|
|
return attemptedBytesRead;
|
|
}
|
|
|
|
@Override
|
|
public void attemptedBytesRead(int bytes) {
|
|
attemptedBytesRead = bytes;
|
|
}
|
|
|
|
protected final int totalBytesRead() {
|
|
return totalBytesRead;
|
|
}
|
|
}
|
|
}
|