[#3675] Fix livelock issue in MpscLinkedQueue

Motivation:

All read operations should be safe to execute from multiple threads which was not the case and so could produce a livelock.

Modifications:

Modify methods so these are safe to be called from multiple threads.

Result:

No more livelock.
This commit is contained in:
Norman Maurer 2015-04-21 12:24:43 +02:00
parent 7b457a48f7
commit 9f6e06dc43

View File

@ -21,10 +21,10 @@ package io.netty.util.internal;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Array;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Queue;
@ -165,11 +165,19 @@ final class MpscLinkedQueue<E> extends MpscLinkedQueueTailRef<E> implements Queu
int count = 0;
MpscLinkedQueueNode<E> n = peekNode();
for (;;) {
if (n == null) {
// If value == null it means that clearMaybe() was called on the MpscLinkedQueueNode.
if (n == null || n.value() == null) {
break;
}
MpscLinkedQueueNode<E> next = n.next();
if (n == next) {
break;
}
n = next;
if (++ count == Integer.MAX_VALUE) {
// Guard against overflow of integer.
break;
}
count ++;
n = n.next();
}
return count;
}
@ -186,40 +194,26 @@ final class MpscLinkedQueue<E> extends MpscLinkedQueueTailRef<E> implements Queu
if (n == null) {
break;
}
if (n.value() == o) {
E value = n.value();
// If value == null it means that clearMaybe() was called on the MpscLinkedQueueNode.
if (value == null) {
return false;
}
if (value == o) {
return true;
}
n = n.next();
MpscLinkedQueueNode<E> next = n.next();
if (n == next) {
break;
}
n = next;
}
return false;
}
@Override
public Iterator<E> iterator() {
return new Iterator<E>() {
private MpscLinkedQueueNode<E> node = peekNode();
@Override
public boolean hasNext() {
return node != null;
}
@Override
public E next() {
MpscLinkedQueueNode<E> node = this.node;
if (node == null) {
throw new NoSuchElementException();
}
E value = node.value();
this.node = node.next();
return value;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
return new ReadOnlyIterator<E>(toList().iterator());
}
@Override
@ -248,53 +242,46 @@ final class MpscLinkedQueue<E> extends MpscLinkedQueueTailRef<E> implements Queu
throw new NoSuchElementException();
}
private List<E> toList(int initialCapacity) {
return toList(new ArrayList<E>(initialCapacity));
}
private List<E> toList() {
return toList(new ArrayList<E>());
}
private List<E> toList(List<E> elements) {
MpscLinkedQueueNode<E> n = peekNode();
for (;;) {
if (n == null) {
break;
}
E value = n.value();
if (value == null) {
break;
}
if (!elements.add(value)) {
// Seems like there is no space left, break here.
break;
}
MpscLinkedQueueNode<E> next = n.next();
if (n == next) {
break;
}
n = next;
}
return elements;
}
@Override
public Object[] toArray() {
final Object[] array = new Object[size()];
final Iterator<E> it = iterator();
for (int i = 0; i < array.length; i ++) {
if (it.hasNext()) {
array[i] = it.next();
} else {
return Arrays.copyOf(array, i);
}
}
return array;
return toList().toArray();
}
@Override
@SuppressWarnings("unchecked")
public <T> T[] toArray(T[] a) {
final int size = size();
final T[] array;
if (a.length >= size) {
array = a;
} else {
array = (T[]) Array.newInstance(a.getClass().getComponentType(), size);
}
final Iterator<E> it = iterator();
for (int i = 0; i < array.length; i++) {
if (it.hasNext()) {
array[i] = (T) it.next();
} else {
if (a == array) {
array[i] = null;
return array;
}
if (a.length < i) {
return Arrays.copyOf(array, i);
}
System.arraycopy(array, 0, a, 0, i);
if (a.length > i) {
a[i] = null;
}
return a;
}
}
return array;
return toList(a.length).toArray(a);
}
@Override