Access autoRead via an AtomicIntegerFieldUpdater.
Motiviation: Before this change, autoRead was a volatile boolean accessed directly. Any thread that invoked the DefaultChannelConfig#setAutoRead(boolean) method would read the current value of autoRead, and then set a new value. If the old value did not match the new value, some action would be immediately taken as part of the same method call. As volatile only provides happens-before consistency, there was no guarantee that the calling thread was actually the thread mutating the state of the autoRead variable (such that it should be the one to invoke the follow-up actions). For example, with 3 threads: * Thread 1: get = false * Thread 1: set = true * Thread 1: invokes read() * Thread 2: get = true * Thread 3: get = true * Thread 2: set = false * Thread 2: invokes autoReadCleared() * Event Loop receives notification from the Selector that data is available, but as autoRead has been cleared, cancels the operation and removes read interest * Thread 3: set = true This results in a livelock - autoRead is set true, but no reads will happen even if data is available (as readyOps). The only way around this livelock currently is to set autoRead to false, and then back to true. Modifications: Write access to the autoRead variable is now made using the getAndSet() method of an AtomicIntegerFieldUpdater, AUTOREAD_UPDATER. This also changed the type of the underlying autoRead variable to be an integer, as no AtomicBooleanFieldUpdater class exists. Boolean logic is retained by assuming that 1 is true and 0 is false. Result: There is no longer a race condition between retrieving the old value of the autoRead variable and setting a new value.
This commit is contained in:
parent
01e3bcf30c
commit
a64484249c
@ -18,7 +18,9 @@ package io.netty.channel;
|
|||||||
import io.netty.buffer.ByteBufAllocator;
|
import io.netty.buffer.ByteBufAllocator;
|
||||||
import io.netty.channel.nio.AbstractNioByteChannel;
|
import io.netty.channel.nio.AbstractNioByteChannel;
|
||||||
import io.netty.channel.socket.SocketChannelConfig;
|
import io.netty.channel.socket.SocketChannelConfig;
|
||||||
|
import io.netty.util.internal.PlatformDependent;
|
||||||
|
|
||||||
|
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
|
||||||
import java.util.IdentityHashMap;
|
import java.util.IdentityHashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
@ -35,6 +37,17 @@ public class DefaultChannelConfig implements ChannelConfig {
|
|||||||
|
|
||||||
private static final int DEFAULT_CONNECT_TIMEOUT = 30000;
|
private static final int DEFAULT_CONNECT_TIMEOUT = 30000;
|
||||||
|
|
||||||
|
private static final AtomicIntegerFieldUpdater<DefaultChannelConfig> AUTOREAD_UPDATER;
|
||||||
|
|
||||||
|
static {
|
||||||
|
AtomicIntegerFieldUpdater<DefaultChannelConfig> autoReadUpdater =
|
||||||
|
PlatformDependent.newAtomicIntegerFieldUpdater(DefaultChannelConfig.class, "autoRead");
|
||||||
|
if (autoReadUpdater == null) {
|
||||||
|
autoReadUpdater = AtomicIntegerFieldUpdater.newUpdater(DefaultChannelConfig.class, "autoRead");
|
||||||
|
}
|
||||||
|
AUTOREAD_UPDATER = autoReadUpdater;
|
||||||
|
}
|
||||||
|
|
||||||
protected final Channel channel;
|
protected final Channel channel;
|
||||||
|
|
||||||
private volatile ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;
|
private volatile ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;
|
||||||
@ -44,7 +57,8 @@ public class DefaultChannelConfig implements ChannelConfig {
|
|||||||
private volatile int connectTimeoutMillis = DEFAULT_CONNECT_TIMEOUT;
|
private volatile int connectTimeoutMillis = DEFAULT_CONNECT_TIMEOUT;
|
||||||
private volatile int maxMessagesPerRead;
|
private volatile int maxMessagesPerRead;
|
||||||
private volatile int writeSpinCount = 16;
|
private volatile int writeSpinCount = 16;
|
||||||
private volatile boolean autoRead = true;
|
@SuppressWarnings("FieldMayBeFinal")
|
||||||
|
private volatile int autoRead = 1;
|
||||||
private volatile boolean autoClose = true;
|
private volatile boolean autoClose = true;
|
||||||
private volatile int writeBufferHighWaterMark = 64 * 1024;
|
private volatile int writeBufferHighWaterMark = 64 * 1024;
|
||||||
private volatile int writeBufferLowWaterMark = 32 * 1024;
|
private volatile int writeBufferLowWaterMark = 32 * 1024;
|
||||||
@ -257,13 +271,12 @@ public class DefaultChannelConfig implements ChannelConfig {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isAutoRead() {
|
public boolean isAutoRead() {
|
||||||
return autoRead;
|
return autoRead == 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelConfig setAutoRead(boolean autoRead) {
|
public ChannelConfig setAutoRead(boolean autoRead) {
|
||||||
boolean oldAutoRead = this.autoRead;
|
boolean oldAutoRead = AUTOREAD_UPDATER.getAndSet(this, autoRead ? 1 : 0) == 1;
|
||||||
this.autoRead = autoRead;
|
|
||||||
if (autoRead && !oldAutoRead) {
|
if (autoRead && !oldAutoRead) {
|
||||||
channel.read();
|
channel.read();
|
||||||
} else if (!autoRead && oldAutoRead) {
|
} else if (!autoRead && oldAutoRead) {
|
||||||
|
Loading…
Reference in New Issue
Block a user