Update natives

This commit is contained in:
Andrea Cavalli 2023-05-10 15:26:11 +02:00
parent a08c987758
commit 17849d6b58
22 changed files with 1196 additions and 98 deletions

View File

@ -8,8 +8,8 @@
<name>TDLight Java BOM</name>
<properties>
<revision>3.0.0.0-SNAPSHOT</revision>
<tdlight.natives.version>4.0.376</tdlight.natives.version>
<tdlight.api.version>4.0.374</tdlight.api.version>
<tdlight.natives.version>4.0.377</tdlight.natives.version>
<tdlight.api.version>4.0.375</tdlight.api.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>

View File

@ -2,18 +2,18 @@ package it.tdlight.example;
import it.tdlight.Init;
import it.tdlight.TelegramClient;
import it.tdlight.util.CantLoadLibrary;
import it.tdlight.jni.TdApi;
import it.tdlight.ClientFactory;
import it.tdlight.util.UnsupportedNativeLibraryException;
/**
* This is an advanced example that uses directly the native client without using the SimpleClient implementation
*/
public class AdvancedExample {
public static void main(String[] args) throws CantLoadLibrary {
public static void main(String[] args) throws UnsupportedNativeLibraryException {
// Initialize TDLight native libraries
Init.start();
Init.init();
// Create a client manager, it should be closed before shutdown
ClientFactory clientManager = new ClientFactory();

View File

@ -9,8 +9,8 @@ import it.tdlight.Init;
import it.tdlight.jni.TdApi.AuthorizationState;
import it.tdlight.jni.TdApi.Chat;
import it.tdlight.jni.TdApi.MessageContent;
import it.tdlight.util.CantLoadLibrary;
import it.tdlight.jni.TdApi;
import it.tdlight.util.UnsupportedNativeLibraryException;
import java.nio.file.Path;
import java.nio.file.Paths;
@ -28,9 +28,9 @@ public final class Example {
private static SimpleTelegramClient client;
public static void main(String[] args) throws CantLoadLibrary, InterruptedException {
public static void main(String[] args) throws UnsupportedNativeLibraryException, InterruptedException {
// Initialize TDLight native libraries
Init.start();
Init.init();
// Create the client factory
try (SimpleTelegramClientFactory clientFactory = new SimpleTelegramClientFactory()) {

View File

@ -79,6 +79,11 @@
<optional>true</optional>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.jctools</groupId>
<artifactId>jctools-core</artifactId>
<version>4.0.1</version>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>

View File

@ -1,9 +1,29 @@
package it.tdlight;
import it.tdlight.util.IntSwapper;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
class ArrayUtil {
public static int[] copyFromCollection(Collection<Integer> list) {
int[] result = new int[list.size()];
int i = 0;
for (Integer item : list) {
result[i++] = item;
}
return result;
}
public static Set<Integer> toSet(int[] list) {
Set<Integer> set = new HashSet<>(list.length);
for (int item : list) {
set.add(item);
}
return set;
}
public interface IntComparator {
int compare(int k1, int k2);
}

View File

@ -0,0 +1,57 @@
package it.tdlight;
import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.Function;
import it.tdlight.jni.TdApi.Object;
import it.tdlight.jni.TdApi.Update;
import it.tdlight.util.CleanSupport;
import it.tdlight.util.CleanSupport.CleanableSupport;
import java.util.function.LongSupplier;
class AutoCleaningTelegramClient implements TelegramClient {
private final TelegramClient client;
private volatile CleanableSupport cleanable;
AutoCleaningTelegramClient(InternalClientsState state) {
this.client = new InternalClient(state, this::onClientRegistered);
}
public void onClientRegistered(int clientId, LongSupplier nextQueryIdSupplier) {
Runnable shutDown = () -> NativeClientAccess.send(clientId, nextQueryIdSupplier.getAsLong(), new TdApi.Close());
Thread shutdownHook = new Thread(shutDown);
Runtime.getRuntime().addShutdownHook(shutdownHook);
cleanable = CleanSupport.register(this, shutDown);
}
@Override
public void initialize(UpdatesHandler updatesHandler,
ExceptionHandler updateExceptionHandler,
ExceptionHandler defaultExceptionHandler) {
client.initialize(updatesHandler, updateExceptionHandler, defaultExceptionHandler);
}
@Override
public void initialize(ResultHandler<Update> updateHandler,
ExceptionHandler updateExceptionHandler,
ExceptionHandler defaultExceptionHandler) {
client.initialize(updateHandler, updateExceptionHandler, defaultExceptionHandler);
}
@Override
public <R extends Object> void send(Function<R> query,
ResultHandler<R> resultHandler,
ExceptionHandler exceptionHandler) {
client.send(query, resultHandler, exceptionHandler);
}
@Override
public <R extends Object> void send(Function<R> query, ResultHandler<R> resultHandler) {
client.send(query, resultHandler);
}
@Override
public <R extends Object> Object execute(Function<R> query) {
return client.execute(query);
}
}

View File

@ -59,7 +59,7 @@ public class ClientFactory implements AutoCloseable {
}
public TelegramClient createClient() {
return new InternalClient(state);
return new AutoCleaningTelegramClient(state);
}
public ReactiveTelegramClient createReactive() {
@ -71,7 +71,7 @@ public class ClientFactory implements AutoCloseable {
try {
Init.init();
responseReceiver.start();
this.cleanable = CleanSupport.register(responseReceiver, () -> {
this.cleanable = CleanSupport.register(this, () -> {
try {
this.responseReceiver.close();
} catch (InterruptedException e) {

View File

@ -0,0 +1,7 @@
package it.tdlight;
import java.util.function.LongSupplier;
public interface ClientRegistrationEventHandler {
void onClientRegistered(int clientId, LongSupplier nextQueryIdSupplier);
}

View File

@ -6,8 +6,7 @@ import it.tdlight.jni.TdApi.Object;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.jctools.maps.NonBlockingHashMapLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
@ -18,9 +17,8 @@ final class InternalClient implements ClientEventsHandler, TelegramClient {
private static final Marker TG_MARKER = MarkerFactory.getMarker("TG");
private static final Logger logger = LoggerFactory.getLogger(TelegramClient.class);
private final ConcurrentHashMap<Long, Handler<?>> handlers = new ConcurrentHashMap<>();
private final Thread shutdownHook = new Thread(this::onJVMShutdown);
private ClientRegistrationEventHandler clientRegistrationEventHandler;
private final NonBlockingHashMapLong<Handler<?>> handlers = new NonBlockingHashMapLong<>();
private volatile Integer clientId = null;
private final InternalClientsState clientManagerState;
@ -28,11 +26,13 @@ final class InternalClient implements ClientEventsHandler, TelegramClient {
private MultiHandler updatesHandler;
private ExceptionHandler defaultExceptionHandler;
private final AtomicBoolean isClosed = new AtomicBoolean();
private final java.lang.Object closeLock = new java.lang.Object();
private volatile boolean closed = false;
public InternalClient(InternalClientsState clientManagerState) {
public InternalClient(InternalClientsState clientManagerState,
ClientRegistrationEventHandler clientRegistrationEventHandler) {
this.clientManagerState = clientManagerState;
Runtime.getRuntime().addShutdownHook(shutdownHook);
this.clientRegistrationEventHandler = clientRegistrationEventHandler;
}
@Override
@ -68,20 +68,18 @@ final class InternalClient implements ClientEventsHandler, TelegramClient {
}
}
if (isClosed) {
if (this.isClosed.compareAndSet(false, true)) {
handleClose();
}
if (isClosed && !closed) {
synchronized (closeLock) {
if (!closed) {
closed = true;
handleClose();
}
}
}
}
private void handleClose() {
logger.trace(TG_MARKER, "Received close");
try {
Runtime.getRuntime().removeShutdownHook(shutdownHook);
} catch (IllegalStateException ignored) {
logger.trace(TG_MARKER, "Can't remove shutdown hook because the JVM is already shutting down");
}
handlers.forEach((eventId, handler) ->
handleResponse(eventId, new TdApi.Error(500, "Instance closed"), handler));
handlers.clear();
@ -152,7 +150,14 @@ final class InternalClient implements ClientEventsHandler, TelegramClient {
if (clientId != null) {
throw new UnsupportedOperationException("Can't initialize the same client twice!");
}
clientId = NativeClientAccess.create();
int clientId = NativeClientAccess.create();
InternalClientsState clientManagerState = this.clientManagerState;
this.clientId = clientId;
if (clientRegistrationEventHandler != null) {
clientRegistrationEventHandler.onClientRegistered(clientId, clientManagerState::getNextQueryId);
// Remove the event handler
clientRegistrationEventHandler = null;
}
}
clientManagerState.registerClient(clientId, this);
logger.info(TG_MARKER, "Registered new client {}", clientId);
@ -166,15 +171,16 @@ final class InternalClient implements ClientEventsHandler, TelegramClient {
ResultHandler<R> resultHandler,
ExceptionHandler exceptionHandler) {
logger.trace(TG_MARKER, "Trying to send {}", query);
if (isClosedAndMaybeThrow(query)) {
resultHandler.onResult(new TdApi.Ok());
}
if (clientId == null) {
ExceptionHandler handler = exceptionHandler == null ? defaultExceptionHandler : exceptionHandler;
handler.onException(new IllegalStateException(
"Can't send a request to TDLib before calling \"initialize\" function!"));
// Handle special requests
TdApi.Object specialResult = tryHandleSpecial(query);
if (specialResult != null) {
if (resultHandler != null) {
resultHandler.onResult(specialResult);
}
return;
}
long queryId = clientManagerState.getNextQueryId();
if (resultHandler != null) {
handlers.put(queryId, new Handler<>(resultHandler, exceptionHandler));
@ -185,36 +191,31 @@ final class InternalClient implements ClientEventsHandler, TelegramClient {
@Override
public <R extends TdApi.Object> TdApi.Object execute(Function<R> query) {
logger.trace(TG_MARKER, "Trying to execute {}", query);
if (isClosedAndMaybeThrow(query)) {
return new TdApi.Ok();
}
return NativeClientAccess.execute(query);
}
private void onJVMShutdown() {
if ("true".equalsIgnoreCase(System.getProperty("it.tdlight.enableShutdownHooks", "true"))) {
try {
logger.info(TG_MARKER, "Client {} is shutting down because the JVM is shutting down", clientId);
this.send(new TdApi.Close(), result -> {}, ex -> {});
} catch (Throwable ex) {
logger.debug("Failed to send shutdown request to session {}", clientId);
}
// Handle special requests
TdApi.Object specialResult = tryHandleSpecial(query);
if (specialResult != null) {
return specialResult;
}
return NativeClientAccess.execute(query);
}
/**
* @param function function used to check if the check will be enforced or not. Can be null
* @return true if closed
* @return not null if closed. The result, if present, must be sent to the client
*/
private boolean isClosedAndMaybeThrow(Function<?> function) {
boolean closed = isClosed.get();
if (closed) {
private <R extends TdApi.Object> TdApi.Object tryHandleSpecial(Function<R> function) {
if (this.closed) {
if (function != null && function.getConstructor() == TdApi.Close.CONSTRUCTOR) {
return true;
return new TdApi.Ok();
} else {
throw new IllegalStateException("The client is closed!");
return new TdApi.Error(503, "Client closed");
}
} else if (clientId == null) {
return new TdApi.Error(503, "Client not initialized. TDLib is not available until \"initialize\" is called!");
} else {
return null;
}
return false;
}
}

View File

@ -1,6 +1,6 @@
package it.tdlight;
import java.util.concurrent.ConcurrentHashMap;
import org.jctools.maps.NonBlockingHashMapLong;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@ -12,7 +12,7 @@ public class InternalClientsState {
static final int STATE_STOPPED = 4;
private final AtomicInteger runState = new AtomicInteger();
private final AtomicLong currentQueryId = new AtomicLong();
private final ConcurrentHashMap<Integer, ClientEventsHandler> registeredClientEventHandlers = new ConcurrentHashMap<>();
private final NonBlockingHashMapLong<ClientEventsHandler> registeredClientEventHandlers = new NonBlockingHashMapLong<>();
public long getNextQueryId() {

View File

@ -5,8 +5,6 @@ import it.tdlight.jni.TdApi.Error;
import it.tdlight.jni.TdApi.Function;
import java.time.Duration;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@ -14,6 +12,8 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import it.tdlight.util.NonBlockingHashSetLong;
import org.jctools.maps.NonBlockingHashMapLong;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
@ -27,9 +27,14 @@ final class InternalReactiveClient implements ClientEventsHandler, ReactiveTeleg
private static final Logger logger = LoggerFactory.getLogger(InternalReactiveClient.class);
private static final Handler<?> EMPTY_HANDLER = new Handler<>(r -> {}, ex -> {});
private final ConcurrentHashMap<Long, Handler<?>> handlers = new ConcurrentHashMap<>();
private final Set<Long> timedOutHandlers = new ConcurrentHashMap<Long, Object>().keySet(new Object());
private final ScheduledExecutorService timers = Executors.newSingleThreadScheduledExecutor();
private final NonBlockingHashMapLong<Handler<?>> handlers = new NonBlockingHashMapLong<>();
private final NonBlockingHashSetLong timedOutHandlers = new NonBlockingHashSetLong();
private final ScheduledExecutorService timers = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r);
t.setName("TDLight-Timers");
t.setDaemon(true);
return t;
});
private final ExceptionHandler defaultExceptionHandler;
private final Handler<TdApi.Update> updateHandler;
@ -39,9 +44,7 @@ final class InternalReactiveClient implements ClientEventsHandler, ReactiveTeleg
private final InternalClientsState clientManagerState;
private final AtomicBoolean alreadyReceivedClosed = new AtomicBoolean();
// This field is not volatile, but it's not problematic, because ReplayStartupUpdatesListener is able to forward
// updates to the right listener
private SignalListener signalListener = new ReplayStartupUpdatesListener();
private volatile SignalListener signalListener = new ReplayStartupUpdatesListener();
public InternalReactiveClient(InternalClientsState clientManagerState) {
this.clientManagerState = clientManagerState;

View File

@ -3,18 +3,17 @@ package it.tdlight;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import it.tdlight.util.SimpleIntQueue;
import it.tdlight.util.IntSwapper;
import it.tdlight.util.SpinWaitSupport;
import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.UpdateAuthorizationState;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.StringJoiner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
abstract class ResponseReceiver extends Thread implements AutoCloseable {
@ -42,8 +41,9 @@ abstract class ResponseReceiver extends Thread implements AutoCloseable {
private int clientEventsLastUsedLength = 0;
private final CountDownLatch closeWait = new CountDownLatch(1);
private final Set<Integer> registeredClients = new ConcurrentHashMap<Integer, java.lang.Object>()
.keySet(new java.lang.Object());
private final Object registeredClientsLock = new Object();
// Do not modify the int[] directly, this should be replaced
private volatile int[] registeredClients = new int[0];
public ResponseReceiver(EventsHandler eventsHandler) {
@ -60,9 +60,10 @@ abstract class ResponseReceiver extends Thread implements AutoCloseable {
@Override
public void run() {
int[] sortIndex;
final SimpleIntQueue closedClients = new SimpleIntQueue();
try {
boolean interrupted;
while (!(interrupted = Thread.interrupted()) && !registeredClients.isEmpty()) {
while (!(interrupted = Thread.interrupted()) && registeredClients.length > 0) {
// Timeout is expressed in seconds
int resultsCount = receive(clientIds, eventIds, events, 2.0);
@ -71,7 +72,7 @@ abstract class ResponseReceiver extends Thread implements AutoCloseable {
continue;
}
Set<Integer> closedClients = new HashSet<>();
closedClients.reset();
if (USE_OPTIMIZED_DISPATCHER) {
// Generate a list of indices sorted by client id, from 0 to resultsCount
@ -197,13 +198,17 @@ abstract class ResponseReceiver extends Thread implements AutoCloseable {
cleanEventsArray(resultsCount);
if (!closedClients.isEmpty()) {
this.registeredClients.removeAll(closedClients);
if (closedClients.isContentful()) {
synchronized (this.registeredClientsLock) {
Set<Integer> remainingRegisteredClients = ArrayUtil.toSet(this.registeredClients);
closedClients.drain(remainingRegisteredClients::remove);
this.registeredClients = ArrayUtil.copyFromCollection(remainingRegisteredClients);
}
}
}
if (interrupted) {
for (Integer clientId : this.registeredClients) {
for (int clientId : registeredClients) {
eventsHandler.handleClientEvents(clientId, true, clientEventIds, clientEvents, 0, 0);
}
}
@ -250,13 +255,17 @@ abstract class ResponseReceiver extends Thread implements AutoCloseable {
}
public void registerClient(int clientId) {
registeredClients.add(clientId);
synchronized (registeredClientsLock) {
Set<Integer> modifiableRegisteredClients = ArrayUtil.toSet(this.registeredClients);
modifiableRegisteredClients.add(clientId);
this.registeredClients = ArrayUtil.copyFromCollection(modifiableRegisteredClients);
}
}
@Override
public void close() throws InterruptedException {
this.closeWait.await();
if (registeredClients.isEmpty()) {
if (registeredClients.length == 0) {
ResponseReceiver.this.interrupt();
}
}

View File

@ -19,9 +19,10 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
import org.jctools.maps.NonBlockingHashMap;
import org.jctools.maps.NonBlockingHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -43,13 +44,10 @@ public final class SimpleTelegramClient implements Authenticable, MutableTelegra
private final TDLibSettings settings;
private AuthenticationSupplier<?> authenticationData;
private final Map<String, Set<CommandHandler>> commandHandlers = new ConcurrentHashMap<>();
private final Set<ResultHandler<TdApi.Update>> updateHandlers = new ConcurrentHashMap<ResultHandler<TdApi.Update>, Object>().keySet(
new Object());
private final Set<ExceptionHandler> updateExceptionHandlers = new ConcurrentHashMap<ExceptionHandler, Object>().keySet(
new Object());
private final Set<ExceptionHandler> defaultExceptionHandlers = new ConcurrentHashMap<ExceptionHandler, Object>().keySet(
new Object());
private final Map<String, Set<CommandHandler>> commandHandlers = new NonBlockingHashMap<>();
private final Set<ResultHandler<TdApi.Update>> updateHandlers = new NonBlockingHashSet<>();
private final Set<ExceptionHandler> updateExceptionHandlers = new NonBlockingHashSet<>();
private final Set<ExceptionHandler> defaultExceptionHandlers = new NonBlockingHashSet<>();
private final AuthorizationStateReadyGetMe meGetter;
private final AuthorizationStateReadyLoadChats mainChatsLoader;
@ -71,11 +69,8 @@ public final class SimpleTelegramClient implements Authenticable, MutableTelegra
this.updateHandlers.addAll(updateHandlers);
this.updateExceptionHandlers.addAll(updateExceptionHandlers);
this.defaultExceptionHandlers.addAll(defaultExceptionHandlers);
if (clientInteraction != null) {
this.clientInteraction = clientInteraction;
} else {
this.clientInteraction = new ScannerClientInteraction(SequentialRequestsExecutor.getInstance(), this);
}
this.clientInteraction = clientInteraction != null ? clientInteraction
: new ScannerClientInteraction(SequentialRequestsExecutor.getInstance(), this);
this.addUpdateHandler(TdApi.UpdateAuthorizationState.class,
@ -164,9 +159,7 @@ public final class SimpleTelegramClient implements Authenticable, MutableTelegra
@Override
public <T extends TdApi.Update> void addCommandHandler(String commandName, CommandHandler handler) {
Set<CommandHandler> handlers = this.commandHandlers.computeIfAbsent(commandName,
k -> new ConcurrentHashMap<CommandHandler, Object>().keySet(new Object())
);
Set<CommandHandler> handlers = this.commandHandlers.computeIfAbsent(commandName, k -> new NonBlockingHashSet<>());
handlers.add(handler);
}

View File

@ -1,9 +1,25 @@
package it.tdlight.util;
import java.util.concurrent.atomic.AtomicBoolean;
public class CleanSupport {
public static CleanableSupport register(Object object, Runnable cleanAction) {
return cleanAction::run;
//noinspection removal
return new CleanableSupport() {
private final AtomicBoolean clean = new AtomicBoolean(false);
@Override
public void clean() {
if (clean.compareAndSet(false, true)) {
cleanAction.run();
}
}
@Override
protected void finalize() {
this.clean();
}
};
}

View File

@ -0,0 +1,25 @@
package it.tdlight.util;
/**
* An iterator optimized for primitive collections which avoids auto-boxing on {@link #next()}.
*/
public interface IntIterator {
/**
* Identical to {@link java.util.Iterator#next()} but avoids auto-boxing.
*
* @return The next int in the collection.
*/
int next();
/**
* Identical to {@link java.util.Iterator#hasNext()}.
*
* @return True if the iterator has more elements.
*/
boolean hasNext();
/**
* Identical to {@link java.util.Iterator#remove()}.
*/
void remove();
}

View File

@ -0,0 +1,16 @@
package it.tdlight.util;
import java.util.Iterator;
/**
* An extension of the standard {@link Iterator} interface which provides the {@link #nextLong()} method to avoid
* auto-boxing of results as they are returned.
* */
public interface LongIterator extends Iterator<Long> {
/**
* Returns the next long value without auto-boxing. Using this is preferred to {@link #next()}.
*
* @return The next long value.
*/
long nextLong();
}

View File

@ -86,13 +86,13 @@ public final class Native {
private static Stream<String> getNormalizedArchitectures(String os, String arch) {
switch (os) {
case "linux": {
return Stream.of("linux-" + arch + "-ssl1", "linux-" + arch + "-ssl3");
return Stream.of("linux_" + arch + "_ssl1", "linux_" + arch + "_ssl3");
}
case "windows": {
return Stream.of("windows-" + arch);
return Stream.of("windows_" + arch);
}
case "osx": {
return Stream.of("osx-" + arch);
return Stream.of("osx_" + arch);
}
default: {
throw new UnsupportedOperationException();

View File

@ -50,7 +50,7 @@ public final class NativeLibraryLoader {
private static final Logger logger = LoggerFactory.getLogger(NativeLibraryLoader.class);
private static final String NATIVE_RESOURCE_HOME = "META-INF/tdlight-native/";
private static final String NATIVE_RESOURCE_HOME = "META-INF/tdlightjni/";
private static final Path WORKDIR;
private static final boolean DELETE_NATIVE_LIB_AFTER_LOADING;
private static final boolean TRY_TO_PATCH_SHADED_ID;
@ -154,7 +154,7 @@ public final class NativeLibraryLoader {
private static String calculateMangledPackagePrefix() {
String maybeShaded = NativeLibraryLoader.class.getName();
// Use ! instead of . to avoid shading utilities from modifying the string
String expected = "it!tdlight!util!internal!NativeLibraryLoader".replace('!', '.');
String expected = "it!tdlight!util!NativeLibraryLoader".replace('!', '.');
if (!maybeShaded.endsWith(expected)) {
throw new UnsatisfiedLinkError(String.format(
"Could not find prefix added to %s to get %s. When shading, only adding a "

View File

@ -0,0 +1,188 @@
package it.tdlight.util;
import java.io.Serializable;
import java.util.AbstractSet;
import java.util.Collection;
import java.util.Iterator;
import java.util.Set;
import org.jctools.maps.NonBlockingHashMapLong;
/**
* A simple wrapper around {@link NonBlockingHashMapLong} making it implement the
* {@link Set} interface. All operations are Non-Blocking and multi-thread safe.
*/
public class NonBlockingHashSetLong extends AbstractSet<Long> implements Serializable {
private static final Object V = "";
private final NonBlockingHashMapLong<Object> _map;
/** Make a new empty {@link NonBlockingHashSetLong}. */
public NonBlockingHashSetLong() {
super();
_map = new NonBlockingHashMapLong<Object>();
}
@Override
public boolean addAll(Collection<? extends Long> c) {
if (!NonBlockingHashSetLong.class.equals(c.getClass())) {
return super.addAll(c);
}
boolean modified = false;
for (final LongIterator it = ((NonBlockingHashSetLong)c).longIterator(); it.hasNext(); ) {
modified |= add(it.nextLong());
}
return modified;
}
@Override
public boolean removeAll(Collection<?> c) {
if (!NonBlockingHashSetLong.class.equals(c.getClass())) {
return super.removeAll(c);
}
boolean modified = false;
for (final LongIterator it = ((NonBlockingHashSetLong)c).longIterator(); it.hasNext(); ) {
modified |= remove(it.nextLong());
}
return modified;
}
@Override
public boolean containsAll(Collection<?> c) {
if (!NonBlockingHashSetLong.class.equals(c.getClass())) {
return super.containsAll(c);
}
for (final LongIterator it = ((NonBlockingHashSetLong)c).longIterator(); it.hasNext(); ) {
if (!contains(it.nextLong())) {
return false;
}
}
return true;
}
@Override
public boolean retainAll(Collection<?> c) {
if (!NonBlockingHashSetLong.class.equals(c.getClass())) {
return super.retainAll(c);
}
boolean modified = false;
final NonBlockingHashSetLong nonBlockingHashSetLong = (NonBlockingHashSetLong) c;
for (final LongIterator it = longIterator(); it.hasNext(); ) {
if (!nonBlockingHashSetLong.contains(it.nextLong())) {
it.remove();
modified = true;
}
}
return modified;
}
@Override
public int hashCode() {
int hashCode = 0;
for (final LongIterator it = longIterator(); it.hasNext(); ) {
final long value = it.nextLong();
hashCode += (int)(value ^ (value >>> 32));
}
return hashCode;
}
/** Add {@code o} to the set.
* @return <tt>true</tt> if {@code o} was added to the set, <tt>false</tt>
* if {@code o} was already in the set.
*/
public boolean add(final long o) {
return _map.putIfAbsent(o,V) != V;
}
/**
* To support AbstractCollection.addAll
*/
@Override
public boolean add(final Long o) {
return _map.putIfAbsent(o.longValue(),V) != V;
}
/**
* @return <tt>true</tt> if {@code o} is in the set.
*/
public boolean contains(final long o) { return _map.containsKey(o); }
@Override
public boolean contains(Object o) {
return o instanceof Long && contains(((Long) o).longValue());
}
/** Remove {@code o} from the set.
* @return <tt>true</tt> if {@code o} was removed to the set, <tt>false</tt>
* if {@code o} was not in the set.
*/
public boolean remove(final long o) { return _map.remove(o) == V; }
@Override
public boolean remove(final Object o) { return o instanceof Long && remove(((Long) o).longValue()); }
/**
* Current count of elements in the set. Due to concurrent racing updates,
* the size is only ever approximate. Updates due to the calling thread are
* immediately visible to calling thread.
* @return count of elements.
*/
@Override
public int size() { return _map.size(); }
/** Empty the set. */
@Override
public void clear() { _map.clear(); }
@Override
public String toString() {
// Overloaded to avoid auto-boxing
final LongIterator it = longIterator();
if (!it.hasNext()) {
return "[]";
}
final StringBuilder sb = new StringBuilder().append('[');
for (;;) {
sb.append(it.next());
if (!it.hasNext()) {
return sb.append(']').toString();
}
sb.append(", ");
}
}
@Override
public Iterator<Long>iterator() { return _map.keySet().iterator(); }
public LongIterator longIterator() {
return (LongIterator) _map.keySet().iterator();
}
// ---
/**
* Atomically make the set immutable. Future calls to mutate will throw an
* IllegalStateException. Existing mutator calls in other threads racing
* with this thread and will either throw IllegalStateException or their
* update will be visible to this thread. This implies that a simple flag
* cannot make the Set immutable, because a late-arriving update in another
* thread might see immutable flag not set yet, then mutate the Set after
* the {@link #readOnly} call returns. This call can be called concurrently
* (and indeed until the operation completes, all calls on the Set from any
* thread either complete normally or end up calling {@link #readOnly}
* internally).
*
* <p> This call is useful in debugging multi-threaded programs where the
* Set is constructed in parallel, but construction completes after some
* time; and after construction the Set is only read. Making the Set
* read-only will cause updates arriving after construction is supposedly
* complete to throw an {@link IllegalStateException}.
*/
// (1) call _map's immutable() call
// (2) get snapshot
// (3) CAS down a local map, power-of-2 larger than _map.size()+1/8th
// (4) start @ random, visit all snapshot, insert live keys
// (5) CAS _map to null, needs happens-after (4)
// (6) if Set call sees _map is null, needs happens-after (4) for readers
public void readOnly() {
throw new RuntimeException("Unimplemented");
}
}

View File

@ -0,0 +1,724 @@
/*
* Written by Cliff Click and released to the public domain, as explained at
* http://creativecommons.org/licenses/publicdomain
*/
package it.tdlight.util;
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.util.AbstractSet;
import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicInteger;
import org.jctools.maps.ConcurrentAutoTable;
import sun.misc.Unsafe;
/**
* A multi-threaded bit-vector set, implemented as an array of primitive
* {@code longs}. All operations are non-blocking and multi-threaded safe.
* {@link #contains(int)} calls are roughly the same speed as a {load, mask}
* sequence. {@link #add(int)} and {@link #remove(int)} calls are a tad more
* expensive than a {load, mask, store} sequence because they must use a CAS.
* The bit-vector is auto-sizing.
*
* <p><em>General note of caution:</em> The Set API allows the use of {@link Integer}
* with silent autoboxing - which can be very expensive if many calls are
* being made. Since autoboxing is silent you may not be aware that this is
* going on. The built-in API takes lower-case {@code ints} and is much more
* efficient.
*
* <p>Space: space is used in proportion to the largest element, as opposed to
* the number of elements (as is the case with hash-table based Set
* implementations). Space is approximately (largest_element/8 + 64) bytes.
*
* The implementation is a simple bit-vector using CAS for update.
*
* @since 1.5
* @author Cliff Click
*/
public class NonBlockingSetInt extends AbstractSet<Integer> implements Serializable {
private static final long serialVersionUID = 1234123412341234123L;
private static final Unsafe _unsafe = Unsafe.getUnsafe();
// --- Bits to allow atomic update of the NBSI
private static final long _nbsi_offset;
static { // <clinit>
Field f = null;
try {
f = NonBlockingSetInt.class.getDeclaredField("_nbsi");
} catch( NoSuchFieldException e ) {
}
_nbsi_offset = _unsafe.objectFieldOffset(f);
}
private final boolean CAS_nbsi( NBSI old, NBSI nnn ) {
return _unsafe.compareAndSwapObject(this, _nbsi_offset, old, nnn );
}
// The actual Set of Joy, which changes during a resize event. The
// Only Field for this class, so I can atomically change the entire
// set implementation with a single CAS.
private transient NBSI _nbsi;
/** Create a new empty bit-vector */
public NonBlockingSetInt( ) {
_nbsi = new NBSI(63, new ConcurrentAutoTable(), this); // The initial 1-word set
}
private NonBlockingSetInt(NonBlockingSetInt a, NonBlockingSetInt b) {
_nbsi = new NBSI(a._nbsi,b._nbsi,new ConcurrentAutoTable(),this);
}
/**
* Overridden to avoid auto-boxing for NonBlockingSetInt.
*
* @param c The collection to add to this set.
* @return True if the set was modified.
*/
@Override
public boolean addAll(Collection<? extends Integer> c) {
if (!NonBlockingSetInt.class.equals(c.getClass())) {
return super.addAll(c);
}
boolean modified = false;
for (final IntIterator it = ((NonBlockingSetInt)c).intIterator(); it.hasNext(); ) {
modified |= add(it.next());
}
return modified;
}
/**
* Overridden to avoid auto-boxing for NonBlockingSetInt.
*
* @param c The collection to remove from this set.
* @return True if the set was modified.
*/
@Override
public boolean removeAll(Collection<?> c) {
if (!NonBlockingSetInt.class.equals(c.getClass())) {
return super.removeAll(c);
}
boolean modified = false;
for (final IntIterator it = ((NonBlockingSetInt)c).intIterator(); it.hasNext(); ) {
modified |= remove(it.next());
}
return modified;
}
@Override
public boolean containsAll(Collection<?> c) {
if (!NonBlockingSetInt.class.equals(c.getClass())) {
return super.containsAll(c);
}
for (final IntIterator it = ((NonBlockingSetInt)c).intIterator(); it.hasNext(); ) {
if (!contains(it.next())) {
return false;
}
}
return true;
}
@Override
public boolean retainAll(Collection<?> c) {
if (!NonBlockingSetInt.class.equals(c.getClass())) {
return super.retainAll(c);
}
boolean modified = false;
final NonBlockingSetInt nonBlockingSetInt = (NonBlockingSetInt) c;
for (final IntIterator it = intIterator(); it.hasNext(); ) {
if (!nonBlockingSetInt.contains(it.next())) {
it.remove();
modified = true;
}
}
return modified;
}
@Override
public int hashCode() {
int hashCode = 0;
for (final IntIterator it = intIterator(); it.hasNext(); ) {
hashCode += it.next();
}
return hashCode;
}
/**
* Add {@code i} to the set. Uppercase {@link Integer} version of add,
* requires auto-unboxing. When possible use the {@code int} version of
* {@link #add(int)} for efficiency.
* @throws IllegalArgumentException if i is negative.
* @return <tt>true</tt> if i was added to the set.
*/
@Override
public boolean add ( final Integer i ) {
return add(i.intValue());
}
/**
* Test if {@code o} is in the set. This is the uppercase {@link Integer}
* version of contains, requires a type-check and auto-unboxing. When
* possible use the {@code int} version of {@link #contains(int)} for
* efficiency.
* @return <tt>true</tt> if i was in the set.
*/
@Override
public boolean contains( final Object o ) {
return o instanceof Integer && contains(((Integer) o).intValue());
}
/**
* Remove {@code o} from the set. This is the uppercase {@link Integer}
* version of remove, requires a type-check and auto-unboxing. When
* possible use the {@code int} version of {@link #remove(int)} for
* efficiency.
* @return <tt>true</tt> if i was removed to the set.
*/
@Override
public boolean remove( final Object o ) {
return o instanceof Integer && remove(((Integer) o).intValue());
}
/**
* Add {@code i} to the set. This is the lower-case '{@code int}' version
* of {@link #add} - no autoboxing. Negative values throw
* IllegalArgumentException.
* @throws IllegalArgumentException if i is negative.
* @return <tt>true</tt> if i was added to the set.
*/
public boolean add( final int i ) {
if( i < 0 ) throw new IllegalArgumentException(""+i);
return _nbsi.add(i);
}
/**
* Test if {@code i} is in the set. This is the lower-case '{@code int}'
* version of {@link #contains} - no autoboxing.
* @return <tt>true</tt> if i was int the set.
*/
public boolean contains( final int i ) { return i >= 0 && _nbsi.contains(i); }
/**
* Remove {@code i} from the set. This is the fast lower-case '{@code int}'
* version of {@link #remove} - no autoboxing.
* @return <tt>true</tt> if i was added to the set.
*/
public boolean remove ( final int i ) { return i >= 0 && _nbsi.remove(i); }
/**
* Current count of elements in the set. Due to concurrent racing updates,
* the size is only ever approximate. Updates due to the calling thread are
* immediately visible to calling thread.
* @return count of elements.
*/
@Override
public int size ( ) { return _nbsi.size( ); }
/** Empty the bitvector. */
@Override
public void clear ( ) {
NBSI cleared = new NBSI(63, new ConcurrentAutoTable(), this); // An empty initial NBSI
while( !CAS_nbsi( _nbsi, cleared ) ) // Spin until clear works
;
}
@Override
public String toString() {
// Overloaded to avoid auto-boxing
final IntIterator it = intIterator();
if (!it.hasNext()) {
return "[]";
}
final StringBuilder sb = new StringBuilder().append('[');
for (;;) {
sb.append(it.next());
if (!it.hasNext()) {
return sb.append(']').toString();
}
sb.append(", ");
}
}
public int sizeInBytes() { return _nbsi.sizeInBytes(); }
/*****************************************************************
*
* bitwise comparisons optimised for NBSI
*
*****************************************************************/
public NonBlockingSetInt intersect(final NonBlockingSetInt op) {
NonBlockingSetInt res = new NonBlockingSetInt(this,op);
res._nbsi.intersect(res._nbsi, this._nbsi, op._nbsi);
return res;
}
public NonBlockingSetInt union(final NonBlockingSetInt op) {
NonBlockingSetInt res = new NonBlockingSetInt(this,op);
res._nbsi.union(res._nbsi, this._nbsi, op._nbsi);
return res;
}
// public NonBlockingSetInt not(final NonBlockingSetInt op) {
//
// }
/** Verbose printout of internal structure for debugging. */
public void print() { _nbsi.print(0); }
/**
* Standard Java {@link Iterator}. Not very efficient because it
* auto-boxes the returned values.
*/
@Override
public Iterator<Integer> iterator( ) { return new iter(); }
public IntIterator intIterator() { return new NBSIIntIterator(); }
private class NBSIIntIterator implements IntIterator {
NBSI nbsi;
int index = -1;
int prev = -1;
NBSIIntIterator() {
nbsi = _nbsi;
advance();
}
private void advance() {
while( true ) {
index++; // Next index
while( (index>>6) >= nbsi._bits.length ) { // Index out of range?
if( nbsi._new == null ) { // New table?
index = -2; // No, so must be all done
return; //
}
nbsi = nbsi._new; // Carry on, in the new table
}
if( nbsi.contains(index) ) return;
}
}
@Override
public int next() {
if( index == -1 ) throw new NoSuchElementException();
prev = index;
advance();
return prev;
}
@Override
public boolean hasNext() {
return index != -2;
}
@Override
public void remove() {
if( prev == -1 ) throw new IllegalStateException();
nbsi.remove(prev);
prev = -1;
}
}
private class iter implements Iterator<Integer> {
NBSIIntIterator intIterator;
iter() { intIterator = new NBSIIntIterator(); }
@Override
public boolean hasNext() { return intIterator.hasNext(); }
@Override
public Integer next() { return intIterator.next(); }
@Override
public void remove() { intIterator.remove(); }
}
// --- writeObject -------------------------------------------------------
// Write a NBSI to a stream
private void writeObject(java.io.ObjectOutputStream s) throws IOException {
s.defaultWriteObject(); // Nothing to write
final NBSI nbsi = _nbsi; // The One Field is transient
final int len = _nbsi._bits.length<<6;
s.writeInt(len); // Write max element
for( int i=0; i<len; i++ )
s.writeBoolean( _nbsi.contains(i) );
}
// --- readObject --------------------------------------------------------
// Read a CHM from a stream
private void readObject(java.io.ObjectInputStream s) throws IOException, ClassNotFoundException {
s.defaultReadObject(); // Read nothing
final int len = s.readInt(); // Read max element
_nbsi = new NBSI(len, new ConcurrentAutoTable(), this);
for( int i=0; i<len; i++ ) // Read all bits
if( s.readBoolean() )
_nbsi.add(i);
}
// --- NBSI ----------------------------------------------------------------
private static final class NBSI {
// Back pointer to the parent wrapper; sorta like make the class non-static
private transient final NonBlockingSetInt _non_blocking_set_int;
// Used to count elements: a high-performance counter.
private transient final ConcurrentAutoTable _size;
// The Bits
private final long _bits[];
// --- Bits to allow Unsafe access to arrays
private static final int _Lbase = _unsafe.arrayBaseOffset(long[].class);
private static final int _Lscale = _unsafe.arrayIndexScale(long[].class);
private static long rawIndex(final long[] ary, final int idx) {
assert idx >= 0 && idx < ary.length;
return _Lbase + idx * _Lscale;
}
private final boolean CAS( int idx, long old, long nnn ) {
return _unsafe.compareAndSwapLong( _bits, rawIndex(_bits, idx), old, nnn );
}
// --- Resize
// The New Table, only set once to non-zero during a resize.
// Must be atomically set.
private NBSI _new;
private static final long _new_offset;
static { // <clinit>
Field f = null;
try {
f = NBSI.class.getDeclaredField("_new");
} catch( NoSuchFieldException e ) {
}
_new_offset = _unsafe.objectFieldOffset(f);
}
private final boolean CAS_new( NBSI nnn ) {
return _unsafe.compareAndSwapObject(this, _new_offset, null, nnn );
}
private transient final AtomicInteger _copyIdx; // Used to count bits started copying
private transient final AtomicInteger _copyDone; // Used to count words copied in a resize operation
private transient final int _sum_bits_length; // Sum of all nested _bits.lengths
private static final long mask( int i ) { return 1L<<(i&63); }
// I need 1 free bit out of 64 to allow for resize. I do this by stealing
// the high order bit - but then I need to do something with adding element
// number 63 (and friends). I could use a mod63 function but it's more
// efficient to handle the mod-64 case as an exception.
//
// Every 64th bit is put in it's own recursive bitvector. If the low 6 bits
// are all set, we shift them off and recursively operate on the _nbsi64 set.
private final NBSI _nbsi64;
private NBSI(int max_elem, ConcurrentAutoTable ctr, NonBlockingSetInt nonb ) {
super();
_non_blocking_set_int = nonb;
_size = ctr;
_copyIdx = ctr == null ? null : new AtomicInteger();
_copyDone = ctr == null ? null : new AtomicInteger();
// The main array of bits
_bits = new long[(int)(((long)max_elem+63)>>>6)];
// Every 64th bit is moved off to it's own subarray, so that the
// sign-bit is free for other purposes
_nbsi64 = ((max_elem+1)>>>6) == 0 ? null : new NBSI((max_elem+1)>>>6, null, null);
_sum_bits_length = _bits.length + (_nbsi64==null ? 0 : _nbsi64._sum_bits_length);
}
/** built a new NBSI with buffers large enough to hold bitwise operations on the operands **/
private NBSI(NBSI a, NBSI b, ConcurrentAutoTable ctr, NonBlockingSetInt nonb) {
super();
_non_blocking_set_int = nonb;
_size = ctr;
_copyIdx = ctr == null ? null : new AtomicInteger();
_copyDone = ctr == null ? null : new AtomicInteger();
if(!has_bits(a) && !has_bits(b)) {
_bits = null;
_nbsi64 = null;
_sum_bits_length = 0;
return;
}
// todo - clean this nastiness up
// essentially just safely creates new empty buffers for each of the recursive bitsets
if(!has_bits(a)) {
_bits = new long[b._bits.length];
_nbsi64 = new NBSI(null,b._nbsi64,null,null);
} else if(!has_bits(b)) {
_bits = new long[a._bits.length];
_nbsi64 = new NBSI(null,a._nbsi64,null,null);
} else {
int bit_length = a._bits.length > b._bits.length ? a._bits.length : b._bits.length;
_bits = new long[bit_length];
_nbsi64 = new NBSI(a._nbsi64,b._nbsi64,null,null);
}
_sum_bits_length = _bits.length + _nbsi64._sum_bits_length;
}
private static boolean has_bits(NBSI n) {
return n != null && n._bits != null;
}
// Lower-case 'int' versions - no autoboxing, very fast.
// 'i' is known positive.
public boolean add( final int i ) {
// Check for out-of-range for the current size bit vector.
// If so we need to grow the bit vector.
if( (i>>6) >= _bits.length )
return install_larger_new_bits(i). // Install larger pile-o-bits (duh)
help_copy().add(i); // Finally, add to the new table
// Handle every 64th bit via using a nested array
NBSI nbsi = this; // The bit array being added into
int j = i; // The bit index being added
while( (j&63) == 63 ) { // Bit 64? (low 6 bits are all set)
nbsi = nbsi._nbsi64; // Recurse
j = j>>6; // Strip off low 6 bits (all set)
}
final long mask = mask(j);
long old;
do {
old = nbsi._bits[j>>6]; // Read old bits
if( old < 0 ) // Not mutable?
// Not mutable: finish copy of word, and retry on copied word
return help_copy_impl(i).help_copy().add(i);
if( (old & mask) != 0 ) return false; // Bit is already set?
} while( !nbsi.CAS( j>>6, old, old | mask ) );
_size.add(1);
return true;
}
public boolean remove( final int i ) {
if( (i>>6) >= _bits.length ) // Out of bounds? Not in this array!
return _new==null ? false : help_copy().remove(i);
// Handle every 64th bit via using a nested array
NBSI nbsi = this; // The bit array being added into
int j = i; // The bit index being added
while( (j&63) == 63 ) { // Bit 64? (low 6 bits are all set)
nbsi = nbsi._nbsi64; // Recurse
j = j>>6; // Strip off low 6 bits (all set)
}
final long mask = mask(j);
long old;
do {
old = nbsi._bits[j>>6]; // Read old bits
if( old < 0 ) // Not mutable?
// Not mutable: finish copy of word, and retry on copied word
return help_copy_impl(i).help_copy().remove(i);
if( (old & mask) == 0 ) return false; // Bit is already clear?
} while( !nbsi.CAS( j>>6, old, old & ~mask ) );
_size.add(-1);
return true;
}
public boolean contains( final int i ) {
if( (i>>6) >= _bits.length ) // Out of bounds? Not in this array!
return _new==null ? false : help_copy().contains(i);
// Handle every 64th bit via using a nested array
NBSI nbsi = this; // The bit array being added into
int j = i; // The bit index being added
while( (j&63) == 63 ) { // Bit 64? (low 6 bits are all set)
nbsi = nbsi._nbsi64; // Recurse
j = j>>6; // Strip off low 6 bits (all set)
}
final long mask = mask(j);
long old = nbsi._bits[j>>6]; // Read old bits
if( old < 0 ) // Not mutable?
// Not mutable: finish copy of word, and retry on copied word
return help_copy_impl(i).help_copy().contains(i);
// Yes mutable: test & return bit
return (old & mask) != 0;
}
/**
* Bitwise operations which store the result in this instance.
* Assumes that this instance contains ample buffer space to store the largest
* buffer from each NBSI in the recursive bitmap.
*
* Also assumes that this method is called during the construction process of
* the bitset before the instance could be leaked to multiple threads.
***/
public boolean intersect(NBSI dest, NBSI a, NBSI b) {
// terminate recursion if one bitset is missing data
// since that word should be left as 0L anyway
if(!has_bits(a) || !has_bits(b))
return true;
for(int i = 0; i < dest._bits.length; i++) {
long left = a.safe_read_word(i,0L);
long right = b.safe_read_word(i,0L);
dest._bits[i] = (left & right) & Long.MAX_VALUE; // mask sign bit
}
// todo - recompute size
return intersect(dest._nbsi64, a._nbsi64, b._nbsi64);
}
public boolean union(NBSI dest, NBSI a, NBSI b) {
// terminate recursion if neiter bitset has data
if(!has_bits(a) && !has_bits(b))
return true;
if(has_bits(a) || has_bits(b)) {
for(int i = 0; i < dest._bits.length; i++) {
long left = a == null ? 0L : a.safe_read_word(i,0);
long right = b == null ? 0L : b.safe_read_word(i,0);
dest._bits[i] = (left | right) & Long.MAX_VALUE;
}
}
return union(dest._nbsi64, a == null ? null : a._nbsi64, b == null ? null : b._nbsi64);
}
/**************************************************************************/
private long safe_read_word(int i, long default_word) {
if(i >= _bits.length) {
// allow reading past the end of the buffer filling in a default word
return default_word;
}
long word = _bits[i];
if(word < 0) {
NBSI nb = help_copy_impl(i);
if(nb._non_blocking_set_int == null) {
return default_word;
}
word = nb.help_copy()._bits[i];
}
return word;
}
public int sizeInBytes() { return (int)_bits.length; }
public int size() { return (int)_size.get(); }
// Must grow the current array to hold an element of size i
private NBSI install_larger_new_bits( final int i ) {
if( _new == null ) {
// Grow by powers of 2, to avoid minor grow-by-1's.
// Note: must grow by exact powers-of-2 or the by-64-bit trick doesn't work right
int sz = (_bits.length<<6)<<1;
// CAS to install a new larger size. Did it work? Did it fail? We
// don't know and don't care. Only One can be installed, so if
// another thread installed a too-small size, we can't help it - we
// must simply install our new larger size as a nested-resize table.
CAS_new(new NBSI(sz, _size, _non_blocking_set_int));
}
// Return self for 'fluid' programming style
return this;
}
// Help any top-level NBSI to copy until completed.
// Always return the _new version of *this* NBSI, in case we're nested.
private NBSI help_copy() {
// Pick some words to help with - but only help copy the top-level NBSI.
// Nested NBSI waits until the top is done before we start helping.
NBSI top_nbsi = _non_blocking_set_int._nbsi;
final int HELP = 8; // Tuning number: how much copy pain are we willing to inflict?
// We "help" by forcing individual bit indices to copy. However, bits
// come in lumps of 64 per word, so we just advance the bit counter by 64's.
int idx = top_nbsi._copyIdx.getAndAdd(64*HELP);
for( int i=0; i<HELP; i++ ) {
int j = idx+i*64;
j %= (top_nbsi._bits.length<<6); // Limit, wrap to array size; means we retry indices
top_nbsi.help_copy_impl(j );
top_nbsi.help_copy_impl(j+63); // Also force the nested-by-64 bit
}
// Top level guy ready to promote?
// Note: WE may not be the top-level guy!
if( top_nbsi._copyDone.get() == top_nbsi._sum_bits_length )
// One shot CAS to promote - it may fail since we are racing; others
// may promote as well
if( _non_blocking_set_int.CAS_nbsi( top_nbsi, top_nbsi._new ) ) {
//System.out.println("Promote at top level to size "+(_non_blocking_set_int._nbsi._bits.length<<6));
}
// Return the new bitvector for 'fluid' programming style
return _new;
}
// Help copy this one word. State Machine.
// (1) If not "made immutable" in the old array, set the sign bit to make
// it immutable.
// (2) If non-zero in old array & zero in new, CAS new from 0 to copy-of-old
// (3) If non-zero in old array & non-zero in new, CAS old to zero
// (4) Zero in old, new is valid
// At this point, old should be immutable-zero & new has a copy of bits
private NBSI help_copy_impl( int i ) {
// Handle every 64th bit via using a nested array
NBSI old = this; // The bit array being copied from
NBSI nnn = _new; // The bit array being copied to
if( nnn == null ) return this; // Promoted already
int j = i; // The bit index being added
while( (j&63) == 63 ) { // Bit 64? (low 6 bits are all set)
old = old._nbsi64; // Recurse
nnn = nnn._nbsi64; // Recurse
j = j>>6; // Strip off low 6 bits (all set)
}
// Transit from state 1: word is not immutable yet
// Immutable is in bit 63, the sign bit.
long bits = old._bits[j>>6];
while( bits >= 0 ) { // Still in state (1)?
long oldbits = bits;
bits |= mask(63); // Target state of bits: sign-bit means immutable
if( old.CAS( j>>6, oldbits, bits ) ) {
if( oldbits == 0 ) _copyDone.addAndGet(1);
break; // Success - old array word is now immutable
}
bits = old._bits[j>>6]; // Retry if CAS failed
}
// Transit from state 2: non-zero in old and zero in new
if( bits != mask(63) ) { // Non-zero in old?
long new_bits = nnn._bits[j>>6];
if( new_bits == 0 ) { // New array is still zero
new_bits = bits & ~mask(63); // Desired new value: a mutable copy of bits
// One-shot CAS attempt, no loop, from 0 to non-zero.
// If it fails, somebody else did the copy for us
if( !nnn.CAS( j>>6, 0, new_bits ) )
new_bits = nnn._bits[j>>6]; // Since it failed, get the new value
assert new_bits != 0;
}
// Transit from state 3: non-zero in old and non-zero in new
// One-shot CAS attempt, no loop, from non-zero to 0 (but immutable)
if( old.CAS( j>>6, bits, mask(63) ) )
_copyDone.addAndGet(1); // One more word finished copying
}
// Now in state 4: zero (and immutable) in old
// Return the self bitvector for 'fluid' programming style
return this;
}
private void print( int d, String msg ) {
for( int i=0; i<d; i++ )
System.out.print(" ");
System.out.println(msg);
}
private void print(int d) {
StringBuilder buf = new StringBuilder();
buf.append("NBSI - _bits.len=");
NBSI x = this;
while( x != null ) {
buf.append(" "+x._bits.length);
x = x._nbsi64;
}
print(d,buf.toString());
x = this;
while( x != null ) {
for( int i=0; i<x._bits.length; i++ )
System.out.print(Long.toHexString(x._bits[i])+" ");
x = x._nbsi64;
System.out.println();
}
if( _copyIdx.get() != 0 || _copyDone.get() != 0 )
print(d,"_copyIdx="+_copyIdx.get()+" _copyDone="+_copyDone.get()+" _words_to_cpy="+_sum_bits_length);
if( _new != null ) {
print(d,"__has_new - ");
_new.print(d+1);
}
}
}
}

View File

@ -0,0 +1,32 @@
package it.tdlight.util;
import java.util.function.IntConsumer;
public final class SimpleIntQueue {
public int size = 0;
public int[] a = new int[16];
public void add(int i) {
if (size >= a.length) {
int[] prev = a;
a = new int[a.length << 1];
System.arraycopy(prev, 0, a, 0, prev.length);
}
a[size++] = i;
}
public void drain(IntConsumer consumer) {
for (int i = 0; i < size; i++) {
consumer.accept(a[i]);
}
reset();
}
public void reset() {
size = 0;
}
public boolean isContentful() {
return size > 0;
}
}

View File

@ -2,6 +2,8 @@ module tdlight.java {
requires tdlight.api;
requires org.reactivestreams;
requires org.slf4j;
requires jctools.core;
requires jdk.unsupported;
requires static com.google.zxing;
requires static reactor.blockhound;
exports it.tdlight.tdnative;