/* * Written by Cliff Click and released to the public domain, as explained at * http://creativecommons.org/licenses/publicdomain */ package it.tdlight.util; import java.io.IOException; import java.io.Serializable; import java.lang.reflect.Field; import java.util.AbstractSet; import java.util.Collection; import java.util.Iterator; import java.util.NoSuchElementException; import java.util.concurrent.atomic.AtomicInteger; import org.jctools.maps.ConcurrentAutoTable; import org.jctools.util.UnsafeAccess; import sun.misc.Unsafe; /** * A multi-threaded bit-vector set, implemented as an array of primitive * {@code longs}. All operations are non-blocking and multi-threaded safe. * {@link #contains(int)} calls are roughly the same speed as a {load, mask} * sequence. {@link #add(int)} and {@link #remove(int)} calls are a tad more * expensive than a {load, mask, store} sequence because they must use a CAS. * The bit-vector is auto-sizing. * *

General note of caution: The Set API allows the use of {@link Integer} * with silent autoboxing - which can be very expensive if many calls are * being made. Since autoboxing is silent you may not be aware that this is * going on. The built-in API takes lower-case {@code ints} and is much more * efficient. * *

Space: space is used in proportion to the largest element, as opposed to * the number of elements (as is the case with hash-table based Set * implementations). Space is approximately (largest_element/8 + 64) bytes. * * The implementation is a simple bit-vector using CAS for update. * * @since 1.5 * @author Cliff Click */ public class NonBlockingSetInt extends AbstractSet implements Serializable { private static final long serialVersionUID = 1234123412341234123L; private static final Unsafe _unsafe = UnsafeAccess.UNSAFE; // --- Bits to allow atomic update of the NBSI private static final long _nbsi_offset; static { // Field f = null; try { f = NonBlockingSetInt.class.getDeclaredField("_nbsi"); } catch( NoSuchFieldException e ) { } _nbsi_offset = _unsafe.objectFieldOffset(f); } private final boolean CAS_nbsi( NBSI old, NBSI nnn ) { return _unsafe.compareAndSwapObject(this, _nbsi_offset, old, nnn ); } // The actual Set of Joy, which changes during a resize event. The // Only Field for this class, so I can atomically change the entire // set implementation with a single CAS. private transient NBSI _nbsi; /** Create a new empty bit-vector */ public NonBlockingSetInt( ) { _nbsi = new NBSI(63, new ConcurrentAutoTable(), this); // The initial 1-word set } private NonBlockingSetInt(NonBlockingSetInt a, NonBlockingSetInt b) { _nbsi = new NBSI(a._nbsi,b._nbsi,new ConcurrentAutoTable(),this); } /** * Overridden to avoid auto-boxing for NonBlockingSetInt. * * @param c The collection to add to this set. * @return True if the set was modified. */ @Override public boolean addAll(Collection c) { if (!NonBlockingSetInt.class.equals(c.getClass())) { return super.addAll(c); } boolean modified = false; for (final IntIterator it = ((NonBlockingSetInt)c).intIterator(); it.hasNext(); ) { modified |= add(it.next()); } return modified; } /** * Overridden to avoid auto-boxing for NonBlockingSetInt. * * @param c The collection to remove from this set. * @return True if the set was modified. */ @Override public boolean removeAll(Collection c) { if (!NonBlockingSetInt.class.equals(c.getClass())) { return super.removeAll(c); } boolean modified = false; for (final IntIterator it = ((NonBlockingSetInt)c).intIterator(); it.hasNext(); ) { modified |= remove(it.next()); } return modified; } @Override public boolean containsAll(Collection c) { if (!NonBlockingSetInt.class.equals(c.getClass())) { return super.containsAll(c); } for (final IntIterator it = ((NonBlockingSetInt)c).intIterator(); it.hasNext(); ) { if (!contains(it.next())) { return false; } } return true; } @Override public boolean retainAll(Collection c) { if (!NonBlockingSetInt.class.equals(c.getClass())) { return super.retainAll(c); } boolean modified = false; final NonBlockingSetInt nonBlockingSetInt = (NonBlockingSetInt) c; for (final IntIterator it = intIterator(); it.hasNext(); ) { if (!nonBlockingSetInt.contains(it.next())) { it.remove(); modified = true; } } return modified; } @Override public int hashCode() { int hashCode = 0; for (final IntIterator it = intIterator(); it.hasNext(); ) { hashCode += it.next(); } return hashCode; } /** * Add {@code i} to the set. Uppercase {@link Integer} version of add, * requires auto-unboxing. When possible use the {@code int} version of * {@link #add(int)} for efficiency. * @throws IllegalArgumentException if i is negative. * @return true if i was added to the set. */ @Override public boolean add ( final Integer i ) { return add(i.intValue()); } /** * Test if {@code o} is in the set. This is the uppercase {@link Integer} * version of contains, requires a type-check and auto-unboxing. When * possible use the {@code int} version of {@link #contains(int)} for * efficiency. * @return true if i was in the set. */ @Override public boolean contains( final Object o ) { return o instanceof Integer && contains(((Integer) o).intValue()); } /** * Remove {@code o} from the set. This is the uppercase {@link Integer} * version of remove, requires a type-check and auto-unboxing. When * possible use the {@code int} version of {@link #remove(int)} for * efficiency. * @return true if i was removed to the set. */ @Override public boolean remove( final Object o ) { return o instanceof Integer && remove(((Integer) o).intValue()); } /** * Add {@code i} to the set. This is the lower-case '{@code int}' version * of {@link #add} - no autoboxing. Negative values throw * IllegalArgumentException. * @throws IllegalArgumentException if i is negative. * @return true if i was added to the set. */ public boolean add( final int i ) { if( i < 0 ) throw new IllegalArgumentException(""+i); return _nbsi.add(i); } /** * Test if {@code i} is in the set. This is the lower-case '{@code int}' * version of {@link #contains} - no autoboxing. * @return true if i was int the set. */ public boolean contains( final int i ) { return i >= 0 && _nbsi.contains(i); } /** * Remove {@code i} from the set. This is the fast lower-case '{@code int}' * version of {@link #remove} - no autoboxing. * @return true if i was added to the set. */ public boolean remove ( final int i ) { return i >= 0 && _nbsi.remove(i); } /** * Current count of elements in the set. Due to concurrent racing updates, * the size is only ever approximate. Updates due to the calling thread are * immediately visible to calling thread. * @return count of elements. */ @Override public int size ( ) { return _nbsi.size( ); } /** Empty the bitvector. */ @Override public void clear ( ) { NBSI cleared = new NBSI(63, new ConcurrentAutoTable(), this); // An empty initial NBSI while( !CAS_nbsi( _nbsi, cleared ) ) // Spin until clear works ; } @Override public String toString() { // Overloaded to avoid auto-boxing final IntIterator it = intIterator(); if (!it.hasNext()) { return "[]"; } final StringBuilder sb = new StringBuilder().append('['); for (;;) { sb.append(it.next()); if (!it.hasNext()) { return sb.append(']').toString(); } sb.append(", "); } } public int sizeInBytes() { return _nbsi.sizeInBytes(); } /***************************************************************** * * bitwise comparisons optimised for NBSI * *****************************************************************/ public NonBlockingSetInt intersect(final NonBlockingSetInt op) { NonBlockingSetInt res = new NonBlockingSetInt(this,op); res._nbsi.intersect(res._nbsi, this._nbsi, op._nbsi); return res; } public NonBlockingSetInt union(final NonBlockingSetInt op) { NonBlockingSetInt res = new NonBlockingSetInt(this,op); res._nbsi.union(res._nbsi, this._nbsi, op._nbsi); return res; } // public NonBlockingSetInt not(final NonBlockingSetInt op) { // // } /** Verbose printout of internal structure for debugging. */ public void print() { _nbsi.print(0); } /** * Standard Java {@link Iterator}. Not very efficient because it * auto-boxes the returned values. */ @Override public Iterator iterator( ) { return new iter(); } public IntIterator intIterator() { return new NBSIIntIterator(); } private class NBSIIntIterator implements IntIterator { NBSI nbsi; int index = -1; int prev = -1; NBSIIntIterator() { nbsi = _nbsi; advance(); } private void advance() { while( true ) { index++; // Next index while( (index>>6) >= nbsi._bits.length ) { // Index out of range? if( nbsi._new == null ) { // New table? index = -2; // No, so must be all done return; // } nbsi = nbsi._new; // Carry on, in the new table } if( nbsi.contains(index) ) return; } } @Override public int next() { if( index == -1 ) throw new NoSuchElementException(); prev = index; advance(); return prev; } @Override public boolean hasNext() { return index != -2; } @Override public void remove() { if( prev == -1 ) throw new IllegalStateException(); nbsi.remove(prev); prev = -1; } } private class iter implements Iterator { NBSIIntIterator intIterator; iter() { intIterator = new NBSIIntIterator(); } @Override public boolean hasNext() { return intIterator.hasNext(); } @Override public Integer next() { return intIterator.next(); } @Override public void remove() { intIterator.remove(); } } // --- writeObject ------------------------------------------------------- // Write a NBSI to a stream private void writeObject(java.io.ObjectOutputStream s) throws IOException { s.defaultWriteObject(); // Nothing to write final NBSI nbsi = _nbsi; // The One Field is transient final int len = _nbsi._bits.length<<6; s.writeInt(len); // Write max element for( int i=0; i= 0 && idx < ary.length; return _Lbase + idx * _Lscale; } private final boolean CAS( int idx, long old, long nnn ) { return _unsafe.compareAndSwapLong( _bits, rawIndex(_bits, idx), old, nnn ); } // --- Resize // The New Table, only set once to non-zero during a resize. // Must be atomically set. private NBSI _new; private static final long _new_offset; static { // Field f = null; try { f = NBSI.class.getDeclaredField("_new"); } catch( NoSuchFieldException e ) { } _new_offset = _unsafe.objectFieldOffset(f); } private final boolean CAS_new( NBSI nnn ) { return _unsafe.compareAndSwapObject(this, _new_offset, null, nnn ); } private transient final AtomicInteger _copyIdx; // Used to count bits started copying private transient final AtomicInteger _copyDone; // Used to count words copied in a resize operation private transient final int _sum_bits_length; // Sum of all nested _bits.lengths private static final long mask( int i ) { return 1L<<(i&63); } // I need 1 free bit out of 64 to allow for resize. I do this by stealing // the high order bit - but then I need to do something with adding element // number 63 (and friends). I could use a mod63 function but it's more // efficient to handle the mod-64 case as an exception. // // Every 64th bit is put in it's own recursive bitvector. If the low 6 bits // are all set, we shift them off and recursively operate on the _nbsi64 set. private final NBSI _nbsi64; private NBSI(int max_elem, ConcurrentAutoTable ctr, NonBlockingSetInt nonb ) { super(); _non_blocking_set_int = nonb; _size = ctr; _copyIdx = ctr == null ? null : new AtomicInteger(); _copyDone = ctr == null ? null : new AtomicInteger(); // The main array of bits _bits = new long[(int)(((long)max_elem+63)>>>6)]; // Every 64th bit is moved off to it's own subarray, so that the // sign-bit is free for other purposes _nbsi64 = ((max_elem+1)>>>6) == 0 ? null : new NBSI((max_elem+1)>>>6, null, null); _sum_bits_length = _bits.length + (_nbsi64==null ? 0 : _nbsi64._sum_bits_length); } /** built a new NBSI with buffers large enough to hold bitwise operations on the operands **/ private NBSI(NBSI a, NBSI b, ConcurrentAutoTable ctr, NonBlockingSetInt nonb) { super(); _non_blocking_set_int = nonb; _size = ctr; _copyIdx = ctr == null ? null : new AtomicInteger(); _copyDone = ctr == null ? null : new AtomicInteger(); if(!has_bits(a) && !has_bits(b)) { _bits = null; _nbsi64 = null; _sum_bits_length = 0; return; } // todo - clean this nastiness up // essentially just safely creates new empty buffers for each of the recursive bitsets if(!has_bits(a)) { _bits = new long[b._bits.length]; _nbsi64 = new NBSI(null,b._nbsi64,null,null); } else if(!has_bits(b)) { _bits = new long[a._bits.length]; _nbsi64 = new NBSI(null,a._nbsi64,null,null); } else { int bit_length = a._bits.length > b._bits.length ? a._bits.length : b._bits.length; _bits = new long[bit_length]; _nbsi64 = new NBSI(a._nbsi64,b._nbsi64,null,null); } _sum_bits_length = _bits.length + _nbsi64._sum_bits_length; } private static boolean has_bits(NBSI n) { return n != null && n._bits != null; } // Lower-case 'int' versions - no autoboxing, very fast. // 'i' is known positive. public boolean add( final int i ) { // Check for out-of-range for the current size bit vector. // If so we need to grow the bit vector. if( (i>>6) >= _bits.length ) return install_larger_new_bits(i). // Install larger pile-o-bits (duh) help_copy().add(i); // Finally, add to the new table // Handle every 64th bit via using a nested array NBSI nbsi = this; // The bit array being added into int j = i; // The bit index being added while( (j&63) == 63 ) { // Bit 64? (low 6 bits are all set) nbsi = nbsi._nbsi64; // Recurse j = j>>6; // Strip off low 6 bits (all set) } final long mask = mask(j); long old; do { old = nbsi._bits[j>>6]; // Read old bits if( old < 0 ) // Not mutable? // Not mutable: finish copy of word, and retry on copied word return help_copy_impl(i).help_copy().add(i); if( (old & mask) != 0 ) return false; // Bit is already set? } while( !nbsi.CAS( j>>6, old, old | mask ) ); _size.add(1); return true; } public boolean remove( final int i ) { if( (i>>6) >= _bits.length ) // Out of bounds? Not in this array! return _new==null ? false : help_copy().remove(i); // Handle every 64th bit via using a nested array NBSI nbsi = this; // The bit array being added into int j = i; // The bit index being added while( (j&63) == 63 ) { // Bit 64? (low 6 bits are all set) nbsi = nbsi._nbsi64; // Recurse j = j>>6; // Strip off low 6 bits (all set) } final long mask = mask(j); long old; do { old = nbsi._bits[j>>6]; // Read old bits if( old < 0 ) // Not mutable? // Not mutable: finish copy of word, and retry on copied word return help_copy_impl(i).help_copy().remove(i); if( (old & mask) == 0 ) return false; // Bit is already clear? } while( !nbsi.CAS( j>>6, old, old & ~mask ) ); _size.add(-1); return true; } public boolean contains( final int i ) { if( (i>>6) >= _bits.length ) // Out of bounds? Not in this array! return _new==null ? false : help_copy().contains(i); // Handle every 64th bit via using a nested array NBSI nbsi = this; // The bit array being added into int j = i; // The bit index being added while( (j&63) == 63 ) { // Bit 64? (low 6 bits are all set) nbsi = nbsi._nbsi64; // Recurse j = j>>6; // Strip off low 6 bits (all set) } final long mask = mask(j); long old = nbsi._bits[j>>6]; // Read old bits if( old < 0 ) // Not mutable? // Not mutable: finish copy of word, and retry on copied word return help_copy_impl(i).help_copy().contains(i); // Yes mutable: test & return bit return (old & mask) != 0; } /** * Bitwise operations which store the result in this instance. * Assumes that this instance contains ample buffer space to store the largest * buffer from each NBSI in the recursive bitmap. * * Also assumes that this method is called during the construction process of * the bitset before the instance could be leaked to multiple threads. ***/ public boolean intersect(NBSI dest, NBSI a, NBSI b) { // terminate recursion if one bitset is missing data // since that word should be left as 0L anyway if(!has_bits(a) || !has_bits(b)) return true; for(int i = 0; i < dest._bits.length; i++) { long left = a.safe_read_word(i,0L); long right = b.safe_read_word(i,0L); dest._bits[i] = (left & right) & Long.MAX_VALUE; // mask sign bit } // todo - recompute size return intersect(dest._nbsi64, a._nbsi64, b._nbsi64); } public boolean union(NBSI dest, NBSI a, NBSI b) { // terminate recursion if neiter bitset has data if(!has_bits(a) && !has_bits(b)) return true; if(has_bits(a) || has_bits(b)) { for(int i = 0; i < dest._bits.length; i++) { long left = a == null ? 0L : a.safe_read_word(i,0); long right = b == null ? 0L : b.safe_read_word(i,0); dest._bits[i] = (left | right) & Long.MAX_VALUE; } } return union(dest._nbsi64, a == null ? null : a._nbsi64, b == null ? null : b._nbsi64); } /**************************************************************************/ private long safe_read_word(int i, long default_word) { if(i >= _bits.length) { // allow reading past the end of the buffer filling in a default word return default_word; } long word = _bits[i]; if(word < 0) { NBSI nb = help_copy_impl(i); if(nb._non_blocking_set_int == null) { return default_word; } word = nb.help_copy()._bits[i]; } return word; } public int sizeInBytes() { return (int)_bits.length; } public int size() { return (int)_size.get(); } // Must grow the current array to hold an element of size i private NBSI install_larger_new_bits( final int i ) { if( _new == null ) { // Grow by powers of 2, to avoid minor grow-by-1's. // Note: must grow by exact powers-of-2 or the by-64-bit trick doesn't work right int sz = (_bits.length<<6)<<1; // CAS to install a new larger size. Did it work? Did it fail? We // don't know and don't care. Only One can be installed, so if // another thread installed a too-small size, we can't help it - we // must simply install our new larger size as a nested-resize table. CAS_new(new NBSI(sz, _size, _non_blocking_set_int)); } // Return self for 'fluid' programming style return this; } // Help any top-level NBSI to copy until completed. // Always return the _new version of *this* NBSI, in case we're nested. private NBSI help_copy() { // Pick some words to help with - but only help copy the top-level NBSI. // Nested NBSI waits until the top is done before we start helping. NBSI top_nbsi = _non_blocking_set_int._nbsi; final int HELP = 8; // Tuning number: how much copy pain are we willing to inflict? // We "help" by forcing individual bit indices to copy. However, bits // come in lumps of 64 per word, so we just advance the bit counter by 64's. int idx = top_nbsi._copyIdx.getAndAdd(64*HELP); for( int i=0; i>6; // Strip off low 6 bits (all set) } // Transit from state 1: word is not immutable yet // Immutable is in bit 63, the sign bit. long bits = old._bits[j>>6]; while( bits >= 0 ) { // Still in state (1)? long oldbits = bits; bits |= mask(63); // Target state of bits: sign-bit means immutable if( old.CAS( j>>6, oldbits, bits ) ) { if( oldbits == 0 ) _copyDone.addAndGet(1); break; // Success - old array word is now immutable } bits = old._bits[j>>6]; // Retry if CAS failed } // Transit from state 2: non-zero in old and zero in new if( bits != mask(63) ) { // Non-zero in old? long new_bits = nnn._bits[j>>6]; if( new_bits == 0 ) { // New array is still zero new_bits = bits & ~mask(63); // Desired new value: a mutable copy of bits // One-shot CAS attempt, no loop, from 0 to non-zero. // If it fails, somebody else did the copy for us if( !nnn.CAS( j>>6, 0, new_bits ) ) new_bits = nnn._bits[j>>6]; // Since it failed, get the new value assert new_bits != 0; } // Transit from state 3: non-zero in old and non-zero in new // One-shot CAS attempt, no loop, from non-zero to 0 (but immutable) if( old.CAS( j>>6, bits, mask(63) ) ) _copyDone.addAndGet(1); // One more word finished copying } // Now in state 4: zero (and immutable) in old // Return the self bitvector for 'fluid' programming style return this; } private void print( int d, String msg ) { for( int i=0; i