Fix initialization race condition
This commit is contained in:
parent
b9709313bd
commit
a2bf050742
@ -7,11 +7,11 @@ import it.tdlight.jni.TdApi.Object;
|
|||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import org.reactivestreams.Publisher;
|
import org.reactivestreams.Publisher;
|
||||||
import org.reactivestreams.Subscriber;
|
import org.reactivestreams.Subscriber;
|
||||||
import org.reactivestreams.Subscription;
|
import org.reactivestreams.Subscription;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
@ -138,6 +138,7 @@ public class InternalReactiveClient implements ClientEventsHandler, ReactiveTele
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("ReactiveStreamsSubscriberImplementation")
|
||||||
@Override
|
@Override
|
||||||
public void cancel() {
|
public void cancel() {
|
||||||
if (!isClosed.get()) {
|
if (!isClosed.get()) {
|
||||||
@ -167,14 +168,14 @@ public class InternalReactiveClient implements ClientEventsHandler, ReactiveTele
|
|||||||
};
|
};
|
||||||
this.subscriber = subscriber;
|
this.subscriber = subscriber;
|
||||||
|
|
||||||
createAndRegisterClient();
|
|
||||||
subscriber.onSubscribe(subscription);
|
subscriber.onSubscribe(subscription);
|
||||||
} else {
|
} else {
|
||||||
throw new IllegalStateException("Already subscribed");
|
throw new IllegalStateException("Already subscribed");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void createAndRegisterClient() {
|
@SuppressWarnings("ReactiveStreamsSubscriberImplementation")
|
||||||
|
public void createAndRegisterClient() {
|
||||||
if (clientId != null) throw new UnsupportedOperationException("Can't initialize the same client twice!");
|
if (clientId != null) throw new UnsupportedOperationException("Can't initialize the same client twice!");
|
||||||
clientId = NativeClientAccess.create();
|
clientId = NativeClientAccess.create();
|
||||||
clientManager.registerClient(clientId, this);
|
clientManager.registerClient(clientId, this);
|
||||||
@ -221,7 +222,7 @@ public class InternalReactiveClient implements ClientEventsHandler, ReactiveTele
|
|||||||
subscriber.onComplete();
|
subscriber.onComplete();
|
||||||
} else if (clientId == null) {
|
} else if (clientId == null) {
|
||||||
subscriber.onError(
|
subscriber.onError(
|
||||||
new IllegalStateException("Can't send a request to TDLib before calling \"initialize\" function!")
|
new IllegalStateException("Can't send a request to TDLib before calling \"createAndRegisterClient\" function!")
|
||||||
);
|
);
|
||||||
} else {
|
} else {
|
||||||
long queryId = clientManager.getNextQueryId();
|
long queryId = clientManager.getNextQueryId();
|
||||||
|
@ -6,6 +6,11 @@ import org.reactivestreams.Publisher;
|
|||||||
@SuppressWarnings("ReactiveStreamsPublisherImplementation")
|
@SuppressWarnings("ReactiveStreamsPublisherImplementation")
|
||||||
public interface ReactiveTelegramClient extends Publisher<ReactiveItem> {
|
public interface ReactiveTelegramClient extends Publisher<ReactiveItem> {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates and registers the client
|
||||||
|
*/
|
||||||
|
void createAndRegisterClient();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sends a request to the TDLib.
|
* Sends a request to the TDLib.
|
||||||
*
|
*
|
||||||
|
Loading…
x
Reference in New Issue
Block a user