First commit
This commit is contained in:
parent
bfe1a018ab
commit
4eb61685f9
1
Flow
Submodule
1
Flow
Submodule
@ -0,0 +1 @@
|
||||
Subproject commit 8d7d0e77fd0f432ae8e538a8ee4ebc6d83c9df1b
|
@ -1,27 +0,0 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<classpath>
|
||||
<classpathentry kind="src" output="target/classes" path="src/main/java">
|
||||
<attributes>
|
||||
<attribute name="optional" value="true"/>
|
||||
<attribute name="maven.pomderived" value="true"/>
|
||||
</attributes>
|
||||
</classpathentry>
|
||||
<classpathentry kind="src" output="target/test-classes" path="src/test/java">
|
||||
<attributes>
|
||||
<attribute name="optional" value="true"/>
|
||||
<attribute name="maven.pomderived" value="true"/>
|
||||
<attribute name="test" value="true"/>
|
||||
</attributes>
|
||||
</classpathentry>
|
||||
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8">
|
||||
<attributes>
|
||||
<attribute name="maven.pomderived" value="true"/>
|
||||
</attributes>
|
||||
</classpathentry>
|
||||
<classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER">
|
||||
<attributes>
|
||||
<attribute name="maven.pomderived" value="true"/>
|
||||
</attributes>
|
||||
</classpathentry>
|
||||
<classpathentry kind="output" path="target/classes"/>
|
||||
</classpath>
|
12
flow/.gitignore
vendored
12
flow/.gitignore
vendored
@ -1,12 +0,0 @@
|
||||
target/
|
||||
pom.xml.tag
|
||||
pom.xml.releaseBackup
|
||||
pom.xml.versionsBackup
|
||||
pom.xml.next
|
||||
release.properties
|
||||
dependency-reduced-pom.xml
|
||||
buildNumber.properties
|
||||
.mvn/timing.properties
|
||||
|
||||
# Avoid ignoring Maven wrapper jar file (.jar files are usually ignored)
|
||||
!/.mvn/wrapper/maven-wrapper.jar
|
@ -1,23 +0,0 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<projectDescription>
|
||||
<name>warppi-flow</name>
|
||||
<comment></comment>
|
||||
<projects>
|
||||
</projects>
|
||||
<buildSpec>
|
||||
<buildCommand>
|
||||
<name>org.eclipse.jdt.core.javabuilder</name>
|
||||
<arguments>
|
||||
</arguments>
|
||||
</buildCommand>
|
||||
<buildCommand>
|
||||
<name>org.eclipse.m2e.core.maven2Builder</name>
|
||||
<arguments>
|
||||
</arguments>
|
||||
</buildCommand>
|
||||
</buildSpec>
|
||||
<natures>
|
||||
<nature>org.eclipse.jdt.core.javanature</nature>
|
||||
<nature>org.eclipse.m2e.core.maven2Nature</nature>
|
||||
</natures>
|
||||
</projectDescription>
|
201
flow/LICENSE
201
flow/LICENSE
@ -1,201 +0,0 @@
|
||||
Apache License
|
||||
Version 2.0, January 2004
|
||||
http://www.apache.org/licenses/
|
||||
|
||||
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
|
||||
|
||||
1. Definitions.
|
||||
|
||||
"License" shall mean the terms and conditions for use, reproduction,
|
||||
and distribution as defined by Sections 1 through 9 of this document.
|
||||
|
||||
"Licensor" shall mean the copyright owner or entity authorized by
|
||||
the copyright owner that is granting the License.
|
||||
|
||||
"Legal Entity" shall mean the union of the acting entity and all
|
||||
other entities that control, are controlled by, or are under common
|
||||
control with that entity. For the purposes of this definition,
|
||||
"control" means (i) the power, direct or indirect, to cause the
|
||||
direction or management of such entity, whether by contract or
|
||||
otherwise, or (ii) ownership of fifty percent (50%) or more of the
|
||||
outstanding shares, or (iii) beneficial ownership of such entity.
|
||||
|
||||
"You" (or "Your") shall mean an individual or Legal Entity
|
||||
exercising permissions granted by this License.
|
||||
|
||||
"Source" form shall mean the preferred form for making modifications,
|
||||
including but not limited to software source code, documentation
|
||||
source, and configuration files.
|
||||
|
||||
"Object" form shall mean any form resulting from mechanical
|
||||
transformation or translation of a Source form, including but
|
||||
not limited to compiled object code, generated documentation,
|
||||
and conversions to other media types.
|
||||
|
||||
"Work" shall mean the work of authorship, whether in Source or
|
||||
Object form, made available under the License, as indicated by a
|
||||
copyright notice that is included in or attached to the work
|
||||
(an example is provided in the Appendix below).
|
||||
|
||||
"Derivative Works" shall mean any work, whether in Source or Object
|
||||
form, that is based on (or derived from) the Work and for which the
|
||||
editorial revisions, annotations, elaborations, or other modifications
|
||||
represent, as a whole, an original work of authorship. For the purposes
|
||||
of this License, Derivative Works shall not include works that remain
|
||||
separable from, or merely link (or bind by name) to the interfaces of,
|
||||
the Work and Derivative Works thereof.
|
||||
|
||||
"Contribution" shall mean any work of authorship, including
|
||||
the original version of the Work and any modifications or additions
|
||||
to that Work or Derivative Works thereof, that is intentionally
|
||||
submitted to Licensor for inclusion in the Work by the copyright owner
|
||||
or by an individual or Legal Entity authorized to submit on behalf of
|
||||
the copyright owner. For the purposes of this definition, "submitted"
|
||||
means any form of electronic, verbal, or written communication sent
|
||||
to the Licensor or its representatives, including but not limited to
|
||||
communication on electronic mailing lists, source code control systems,
|
||||
and issue tracking systems that are managed by, or on behalf of, the
|
||||
Licensor for the purpose of discussing and improving the Work, but
|
||||
excluding communication that is conspicuously marked or otherwise
|
||||
designated in writing by the copyright owner as "Not a Contribution."
|
||||
|
||||
"Contributor" shall mean Licensor and any individual or Legal Entity
|
||||
on behalf of whom a Contribution has been received by Licensor and
|
||||
subsequently incorporated within the Work.
|
||||
|
||||
2. Grant of Copyright License. Subject to the terms and conditions of
|
||||
this License, each Contributor hereby grants to You a perpetual,
|
||||
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
|
||||
copyright license to reproduce, prepare Derivative Works of,
|
||||
publicly display, publicly perform, sublicense, and distribute the
|
||||
Work and such Derivative Works in Source or Object form.
|
||||
|
||||
3. Grant of Patent License. Subject to the terms and conditions of
|
||||
this License, each Contributor hereby grants to You a perpetual,
|
||||
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
|
||||
(except as stated in this section) patent license to make, have made,
|
||||
use, offer to sell, sell, import, and otherwise transfer the Work,
|
||||
where such license applies only to those patent claims licensable
|
||||
by such Contributor that are necessarily infringed by their
|
||||
Contribution(s) alone or by combination of their Contribution(s)
|
||||
with the Work to which such Contribution(s) was submitted. If You
|
||||
institute patent litigation against any entity (including a
|
||||
cross-claim or counterclaim in a lawsuit) alleging that the Work
|
||||
or a Contribution incorporated within the Work constitutes direct
|
||||
or contributory patent infringement, then any patent licenses
|
||||
granted to You under this License for that Work shall terminate
|
||||
as of the date such litigation is filed.
|
||||
|
||||
4. Redistribution. You may reproduce and distribute copies of the
|
||||
Work or Derivative Works thereof in any medium, with or without
|
||||
modifications, and in Source or Object form, provided that You
|
||||
meet the following conditions:
|
||||
|
||||
(a) You must give any other recipients of the Work or
|
||||
Derivative Works a copy of this License; and
|
||||
|
||||
(b) You must cause any modified files to carry prominent notices
|
||||
stating that You changed the files; and
|
||||
|
||||
(c) You must retain, in the Source form of any Derivative Works
|
||||
that You distribute, all copyright, patent, trademark, and
|
||||
attribution notices from the Source form of the Work,
|
||||
excluding those notices that do not pertain to any part of
|
||||
the Derivative Works; and
|
||||
|
||||
(d) If the Work includes a "NOTICE" text file as part of its
|
||||
distribution, then any Derivative Works that You distribute must
|
||||
include a readable copy of the attribution notices contained
|
||||
within such NOTICE file, excluding those notices that do not
|
||||
pertain to any part of the Derivative Works, in at least one
|
||||
of the following places: within a NOTICE text file distributed
|
||||
as part of the Derivative Works; within the Source form or
|
||||
documentation, if provided along with the Derivative Works; or,
|
||||
within a display generated by the Derivative Works, if and
|
||||
wherever such third-party notices normally appear. The contents
|
||||
of the NOTICE file are for informational purposes only and
|
||||
do not modify the License. You may add Your own attribution
|
||||
notices within Derivative Works that You distribute, alongside
|
||||
or as an addendum to the NOTICE text from the Work, provided
|
||||
that such additional attribution notices cannot be construed
|
||||
as modifying the License.
|
||||
|
||||
You may add Your own copyright statement to Your modifications and
|
||||
may provide additional or different license terms and conditions
|
||||
for use, reproduction, or distribution of Your modifications, or
|
||||
for any such Derivative Works as a whole, provided Your use,
|
||||
reproduction, and distribution of the Work otherwise complies with
|
||||
the conditions stated in this License.
|
||||
|
||||
5. Submission of Contributions. Unless You explicitly state otherwise,
|
||||
any Contribution intentionally submitted for inclusion in the Work
|
||||
by You to the Licensor shall be under the terms and conditions of
|
||||
this License, without any additional terms or conditions.
|
||||
Notwithstanding the above, nothing herein shall supersede or modify
|
||||
the terms of any separate license agreement you may have executed
|
||||
with Licensor regarding such Contributions.
|
||||
|
||||
6. Trademarks. This License does not grant permission to use the trade
|
||||
names, trademarks, service marks, or product names of the Licensor,
|
||||
except as required for reasonable and customary use in describing the
|
||||
origin of the Work and reproducing the content of the NOTICE file.
|
||||
|
||||
7. Disclaimer of Warranty. Unless required by applicable law or
|
||||
agreed to in writing, Licensor provides the Work (and each
|
||||
Contributor provides its Contributions) on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
implied, including, without limitation, any warranties or conditions
|
||||
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
|
||||
PARTICULAR PURPOSE. You are solely responsible for determining the
|
||||
appropriateness of using or redistributing the Work and assume any
|
||||
risks associated with Your exercise of permissions under this License.
|
||||
|
||||
8. Limitation of Liability. In no event and under no legal theory,
|
||||
whether in tort (including negligence), contract, or otherwise,
|
||||
unless required by applicable law (such as deliberate and grossly
|
||||
negligent acts) or agreed to in writing, shall any Contributor be
|
||||
liable to You for damages, including any direct, indirect, special,
|
||||
incidental, or consequential damages of any character arising as a
|
||||
result of this License or out of the use or inability to use the
|
||||
Work (including but not limited to damages for loss of goodwill,
|
||||
work stoppage, computer failure or malfunction, or any and all
|
||||
other commercial damages or losses), even if such Contributor
|
||||
has been advised of the possibility of such damages.
|
||||
|
||||
9. Accepting Warranty or Additional Liability. While redistributing
|
||||
the Work or Derivative Works thereof, You may choose to offer,
|
||||
and charge a fee for, acceptance of support, warranty, indemnity,
|
||||
or other liability obligations and/or rights consistent with this
|
||||
License. However, in accepting such obligations, You may act only
|
||||
on Your own behalf and on Your sole responsibility, not on behalf
|
||||
of any other Contributor, and only if You agree to indemnify,
|
||||
defend, and hold each Contributor harmless for any liability
|
||||
incurred by, or claims asserted against, such Contributor by reason
|
||||
of your accepting any such warranty or additional liability.
|
||||
|
||||
END OF TERMS AND CONDITIONS
|
||||
|
||||
APPENDIX: How to apply the Apache License to your work.
|
||||
|
||||
To apply the Apache License to your work, attach the following
|
||||
boilerplate notice, with the fields enclosed by brackets "[]"
|
||||
replaced with your own identifying information. (Don't include
|
||||
the brackets!) The text should be enclosed in the appropriate
|
||||
comment syntax for the file format. We also recommend that a
|
||||
file or class name and description of purpose be included on the
|
||||
same "printed page" as the copyright notice for easier
|
||||
identification within third-party archives.
|
||||
|
||||
Copyright [yyyy] [name of copyright owner]
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
@ -1,2 +0,0 @@
|
||||
# Flow
|
||||
Do not use this for your personal projects. This is a bad implementation of a MVC library.
|
52
flow/pom.xml
52
flow/pom.xml
@ -1,52 +0,0 @@
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<groupId>it.cavallium</groupId>
|
||||
<artifactId>warppi</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</parent>
|
||||
<artifactId>warppi-flow</artifactId>
|
||||
<packaging>bundle</packaging>
|
||||
|
||||
<name>WarpPI Flow Library</name>
|
||||
<description>WarpPI Flow library project</description>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-lang3</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.felix</groupId>
|
||||
<artifactId>maven-bundle-plugin</artifactId>
|
||||
<extensions>true</extensions>
|
||||
<configuration>
|
||||
<instructions>
|
||||
<Export-Package>it.cavallium.warppi.*</Export-Package>
|
||||
<Bundle-SymbolicName>warppi-flow</Bundle-SymbolicName>
|
||||
</instructions>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-source-plugin</artifactId>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-javadoc-plugin</artifactId>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
@ -1,5 +0,0 @@
|
||||
package it.cavallium.warppi.flow;
|
||||
|
||||
public interface Action0 {
|
||||
void call();
|
||||
}
|
@ -1,5 +0,0 @@
|
||||
package it.cavallium.warppi.flow;
|
||||
|
||||
public interface Action1<T> {
|
||||
void call(T t);
|
||||
}
|
@ -1,5 +0,0 @@
|
||||
package it.cavallium.warppi.flow;
|
||||
|
||||
public interface Action2<T, U> {
|
||||
void call(T t, U u);
|
||||
}
|
@ -1,91 +0,0 @@
|
||||
package it.cavallium.warppi.flow;
|
||||
|
||||
public class BehaviorSubject<T> extends Subject<T> {
|
||||
|
||||
private T lastValue;
|
||||
private boolean lastValueSet;
|
||||
|
||||
protected BehaviorSubject() {
|
||||
super();
|
||||
lastValue = null;
|
||||
lastValueSet = false;
|
||||
}
|
||||
|
||||
protected BehaviorSubject(final T initialValue) {
|
||||
super();
|
||||
lastValue = initialValue;
|
||||
lastValueSet = true;
|
||||
}
|
||||
|
||||
public final static <T> BehaviorSubject<T> create() {
|
||||
return new BehaviorSubject<>();
|
||||
}
|
||||
|
||||
public final static <T> BehaviorSubject<T> create(final T initialValue) {
|
||||
return new BehaviorSubject<>(initialValue);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onComplete() {
|
||||
for (final Subscriber<? super T> sub : subscribers)
|
||||
sub.onComplete();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(final Throwable e) {
|
||||
for (final Subscriber<? super T> sub : subscribers)
|
||||
sub.onError(e);;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNext(final T t) {
|
||||
lastValue = t;
|
||||
lastValueSet = true;
|
||||
for (final Subscriber<? super T> sub : subscribers)
|
||||
sub.onNext(t);
|
||||
}
|
||||
|
||||
@Override
|
||||
Throwable getThrowable() {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean hasComplete() {
|
||||
// TODO Auto-generated method stub
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean hasObservers() {
|
||||
// TODO Auto-generated method stub
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean hasThrowable() {
|
||||
// TODO Auto-generated method stub
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
Subject<T> toSerialized() {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onSubscribe(final Disposable d) {
|
||||
@SuppressWarnings("unchecked")
|
||||
final DisposableOfSubscriber ds = (DisposableOfSubscriber) d;
|
||||
final Subscriber<? super T> s = ds.getSubscriber();
|
||||
if (lastValueSet)
|
||||
s.onNext(lastValue);
|
||||
}
|
||||
|
||||
public T getLastValue() {
|
||||
return lastValue;
|
||||
}
|
||||
|
||||
}
|
@ -1,36 +0,0 @@
|
||||
package it.cavallium.warppi.flow;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
public interface Consumer<T> {
|
||||
|
||||
/**
|
||||
* Performs this operation on the given argument.
|
||||
*
|
||||
* @param t
|
||||
* the input argument
|
||||
*/
|
||||
void accept(T t);
|
||||
|
||||
/**
|
||||
* Returns a composed {@code Consumer} that performs, in sequence, this
|
||||
* operation followed by the {@code after} operation. If performing either
|
||||
* operation throws an exception, it is relayed to the caller of the
|
||||
* composed operation. If performing this operation throws an exception,
|
||||
* the {@code after} operation will not be performed.
|
||||
*
|
||||
* @param after
|
||||
* the operation to perform after this operation
|
||||
* @return a composed {@code Consumer} that performs in sequence this
|
||||
* operation followed by the {@code after} operation
|
||||
* @throws NullPointerException
|
||||
* if {@code after} is null
|
||||
*/
|
||||
default Consumer<T> andThen(final Consumer<? super T> after) {
|
||||
Objects.requireNonNull(after);
|
||||
return (final T t) -> {
|
||||
accept(t);
|
||||
after.accept(t);
|
||||
};
|
||||
}
|
||||
}
|
@ -1,7 +0,0 @@
|
||||
package it.cavallium.warppi.flow;
|
||||
|
||||
public interface Disposable {
|
||||
void dispose();
|
||||
|
||||
boolean isDisposed();
|
||||
}
|
@ -1,42 +0,0 @@
|
||||
package it.cavallium.warppi.flow;
|
||||
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
public class IntervalsManager {
|
||||
private static List<ObservableInterval> intervals = new LinkedList<>();
|
||||
|
||||
static {
|
||||
IntervalsManager.startChecker();
|
||||
}
|
||||
|
||||
private IntervalsManager() {
|
||||
|
||||
}
|
||||
|
||||
public static void register(final ObservableInterval t) {
|
||||
synchronized (IntervalsManager.intervals) {
|
||||
if (!IntervalsManager.intervals.contains(t))
|
||||
IntervalsManager.intervals.add(t);
|
||||
}
|
||||
}
|
||||
|
||||
private static void startChecker() {
|
||||
final Thread t = new Thread(() -> {
|
||||
try {
|
||||
while (true) {
|
||||
Thread.sleep(1000);
|
||||
for (final ObservableInterval interval : IntervalsManager.intervals)
|
||||
if (interval.running)
|
||||
if (interval.subscribers.size() <= 0)
|
||||
interval.stopInterval();
|
||||
}
|
||||
} catch (final InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
});
|
||||
Utils.setThreadDaemon(t, true);
|
||||
t.start();
|
||||
}
|
||||
}
|
@ -1,190 +0,0 @@
|
||||
package it.cavallium.warppi.flow;
|
||||
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.function.Function;
|
||||
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
public abstract class Observable<T> implements ObservableSource<T> {
|
||||
|
||||
protected List<Subscriber<? super T>> subscribers = new LinkedList<>();
|
||||
|
||||
public Disposable subscribe() {
|
||||
return null;
|
||||
}
|
||||
|
||||
public Disposable subscribe(final Action1<? super T> onNext) {
|
||||
return subscribe(createSubscriber(onNext));
|
||||
}
|
||||
|
||||
protected Observable<T>.DisposableOfSubscriber createDisposable(final Subscriber<? super T> sub) {
|
||||
return new DisposableOfSubscriber(sub);
|
||||
}
|
||||
|
||||
public Disposable subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError) {
|
||||
return subscribe(createSubscriber(onNext, onError));
|
||||
}
|
||||
|
||||
public Disposable subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError,
|
||||
final Action0 onCompleted) {
|
||||
return subscribe(createSubscriber(onNext, onError, onCompleted));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void subscribe(final Observer<? super T> obs) {
|
||||
subscribe(createSubscriber(obs));
|
||||
}
|
||||
|
||||
public Disposable subscribe(final Subscriber<? super T> sub) {
|
||||
subscribers.add(sub);
|
||||
return createDisposable(sub);
|
||||
}
|
||||
|
||||
protected Subscriber<T> createSubscriber(final Action1<? super T> onNext) {
|
||||
return new Subscriber<T>() {
|
||||
@Override
|
||||
public void onSubscribe(final Subscription s) {}
|
||||
|
||||
@Override
|
||||
public void onNext(final T t) {
|
||||
onNext.call(t);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
protected Subscriber<T> createSubscriber(final Action1<? super T> onNext, final Action1<Throwable> onError) {
|
||||
return new Subscriber<T>() {
|
||||
@Override
|
||||
public void onSubscribe(final Subscription s) {}
|
||||
|
||||
@Override
|
||||
public void onNext(final T t) {
|
||||
onNext.call(t);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(final Throwable t) {
|
||||
onError.call(t);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
protected Subscriber<T> createSubscriber(final Action1<? super T> onNext, final Action1<Throwable> onError,
|
||||
final Action0 onCompl) {
|
||||
return new Subscriber<T>() {
|
||||
@Override
|
||||
public void onSubscribe(final Subscription s) {}
|
||||
|
||||
@Override
|
||||
public void onNext(final T t) {
|
||||
onNext.call(t);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(final Throwable t) {
|
||||
onError.call(t);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onComplete() {
|
||||
onCompl.call();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
protected Subscriber<T> createSubscriber(final Observer<? super T> obs) {
|
||||
return new Subscriber<T>() {
|
||||
@Override
|
||||
public void onSubscribe(final Subscription s) {}
|
||||
|
||||
@Override
|
||||
public void onNext(final T t) {
|
||||
obs.onNext(t);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(final Throwable t) {
|
||||
obs.onError(t);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onComplete() {
|
||||
obs.onComplete();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public static final <T> Observable<T> merge(final Observable<T> a, final Observable<T> b) {
|
||||
return new ObservableMerged<>(a, b);
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public static final <T> Observable<T> of(final Observable<T> a) {
|
||||
return null;
|
||||
}
|
||||
|
||||
public final <U> Observable<U> map(final Function<T, U> f) {
|
||||
return new ObservableMap<>(this, f);
|
||||
}
|
||||
|
||||
public static final <T, U> Observable<Pair<T, U>> combineLatest(final Observable<T> a, final Observable<U> b) {
|
||||
return new ObservableCombinedLatest<>(a, b);
|
||||
}
|
||||
|
||||
public static final <T, U> Observable<Pair<T, U>> combineChanged(final Observable<T> a, final Observable<U> b) {
|
||||
return new ObservableCombinedChanged<>(a, b);
|
||||
}
|
||||
|
||||
public static final <T, U> Observable<Pair<T, U>> zip(final Observable<T> a, final Observable<U> b) {
|
||||
return new ObservableZipped<>(a, b);
|
||||
}
|
||||
|
||||
public static final Observable<Long> interval(final long interval) {
|
||||
return new ObservableInterval(interval);
|
||||
}
|
||||
|
||||
protected class DisposableOfSubscriber implements Disposable {
|
||||
|
||||
private final Subscriber<? super T> sub;
|
||||
|
||||
public DisposableOfSubscriber(final Subscriber<? super T> sub) {
|
||||
this.sub = sub;
|
||||
}
|
||||
|
||||
protected Subscriber<? super T> getSubscriber() {
|
||||
return sub;
|
||||
}
|
||||
|
||||
protected Observable<T> getObservable() {
|
||||
return Observable.this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void dispose() {
|
||||
if (isDisposed())
|
||||
throw new RuntimeException("Already disposed!");
|
||||
subscribers.remove(sub);
|
||||
Observable.this.onDisposed(sub);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDisposed() {
|
||||
return !subscribers.contains(sub);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public Observable<T> doOnNext(final Action1<T> onNext) {
|
||||
final Subject<T> onNextSubject = BehaviorSubject.create();
|
||||
this.subscribe((val) -> {
|
||||
onNext.call(val);
|
||||
onNextSubject.onNext(val);
|
||||
});
|
||||
return onNextSubject;
|
||||
}
|
||||
|
||||
public void onDisposed(final Subscriber<? super T> sub) {
|
||||
|
||||
}
|
||||
}
|
@ -1,61 +0,0 @@
|
||||
package it.cavallium.warppi.flow;
|
||||
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
public class ObservableCombinedChanged<T, U> extends Observable<Pair<T, U>> {
|
||||
private volatile boolean initialized = false;
|
||||
private final Observable<T> a;
|
||||
private final Observable<U> b;
|
||||
private Disposable disposableA;
|
||||
private Disposable disposableB;
|
||||
|
||||
public ObservableCombinedChanged(final Observable<T> a, final Observable<U> b) {
|
||||
super();
|
||||
this.a = a;
|
||||
this.b = b;
|
||||
}
|
||||
|
||||
private void initialize() {
|
||||
this.disposableA = a.subscribe((t) -> {
|
||||
for (final Subscriber<? super Pair<T, U>> sub : subscribers)
|
||||
sub.onNext(Pair.of(t, null));
|
||||
}, (e) -> {
|
||||
for (final Subscriber<? super Pair<T, U>> sub : subscribers)
|
||||
sub.onError(e);;
|
||||
}, () -> {
|
||||
for (final Subscriber<? super Pair<T, U>> sub : subscribers)
|
||||
sub.onComplete();;
|
||||
});
|
||||
this.disposableB = b.subscribe((t) -> {
|
||||
for (final Subscriber<? super Pair<T, U>> sub : subscribers)
|
||||
sub.onNext(Pair.of(null, t));;
|
||||
}, (e) -> {
|
||||
for (final Subscriber<? super Pair<T, U>> sub : subscribers)
|
||||
sub.onError(e);;
|
||||
}, () -> {
|
||||
for (final Subscriber<? super Pair<T, U>> sub : subscribers)
|
||||
sub.onComplete();;
|
||||
});
|
||||
}
|
||||
|
||||
private void chechInitialized() {
|
||||
if (!initialized) {
|
||||
initialized = true;
|
||||
initialize();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Disposable subscribe(final Subscriber<? super Pair<T, U>> sub) {
|
||||
final Disposable disp = super.subscribe(sub);
|
||||
chechInitialized();
|
||||
return disp;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onDisposed(final Subscriber<? super Pair<T, U>> sub) {
|
||||
super.onDisposed(sub);
|
||||
this.disposableA.dispose();
|
||||
this.disposableB.dispose();
|
||||
}
|
||||
}
|
@ -1,74 +0,0 @@
|
||||
package it.cavallium.warppi.flow;
|
||||
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
public class ObservableCombinedLatest<T, U> extends Observable<Pair<T, U>> {
|
||||
private volatile boolean initialized = false;
|
||||
private final Observable<T> a;
|
||||
private final Observable<U> b;
|
||||
private Disposable disposableA;
|
||||
private Disposable disposableB;
|
||||
private volatile T lastA;
|
||||
private volatile U lastB;
|
||||
private volatile boolean didAOneTime;
|
||||
private volatile boolean didBOneTime;
|
||||
|
||||
public ObservableCombinedLatest(final Observable<T> a, final Observable<U> b) {
|
||||
super();
|
||||
this.a = a;
|
||||
this.b = b;
|
||||
}
|
||||
|
||||
private void initialize() {
|
||||
this.disposableA = a.subscribe((t) -> {
|
||||
lastA = t;
|
||||
didAOneTime = true;
|
||||
receivedNext();
|
||||
}, (e) -> {
|
||||
for (final Subscriber<? super Pair<T, U>> sub : subscribers)
|
||||
sub.onError(e);;
|
||||
}, () -> {
|
||||
for (final Subscriber<? super Pair<T, U>> sub : subscribers)
|
||||
sub.onComplete();;
|
||||
});
|
||||
this.disposableB = b.subscribe((t) -> {
|
||||
lastB = t;
|
||||
didBOneTime = true;
|
||||
receivedNext();
|
||||
}, (e) -> {
|
||||
for (final Subscriber<? super Pair<T, U>> sub : subscribers)
|
||||
sub.onError(e);;
|
||||
}, () -> {
|
||||
for (final Subscriber<? super Pair<T, U>> sub : subscribers)
|
||||
sub.onComplete();;
|
||||
});
|
||||
}
|
||||
|
||||
private void receivedNext() {
|
||||
if (didAOneTime && didBOneTime)
|
||||
subscribers.forEach(sub -> {
|
||||
sub.onNext(Pair.of(lastA, lastB));
|
||||
});
|
||||
}
|
||||
|
||||
private void chechInitialized() {
|
||||
if (!initialized) {
|
||||
initialized = true;
|
||||
initialize();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Disposable subscribe(final Subscriber<? super Pair<T, U>> sub) {
|
||||
final Disposable disp = super.subscribe(sub);
|
||||
chechInitialized();
|
||||
return disp;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onDisposed(final Subscriber<? super Pair<T, U>> sub) {
|
||||
super.onDisposed(sub);
|
||||
this.disposableA.dispose();
|
||||
this.disposableB.dispose();
|
||||
}
|
||||
}
|
@ -1,63 +0,0 @@
|
||||
package it.cavallium.warppi.flow;
|
||||
|
||||
public class ObservableInterval extends Observable<Long> {
|
||||
private final long interval;
|
||||
volatile boolean running;
|
||||
volatile Thread timeThread;
|
||||
|
||||
protected ObservableInterval(final long interval) {
|
||||
super();
|
||||
this.interval = interval;
|
||||
try {
|
||||
startInterval();
|
||||
} catch (final InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
void stopInterval() {
|
||||
if (running) {
|
||||
running = false;
|
||||
timeThread.interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Disposable subscribe(final Subscriber<? super Long> sub) {
|
||||
try {
|
||||
startInterval();
|
||||
} catch (final InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
return super.subscribe(sub);
|
||||
}
|
||||
|
||||
void startInterval() throws InterruptedException {
|
||||
if (running == false) {
|
||||
while (timeThread != null)
|
||||
Thread.sleep(100);
|
||||
timeThread = new Thread(() -> {
|
||||
try {
|
||||
while (!Thread.interrupted()) {
|
||||
for (final Subscriber<? super Long> sub : subscribers)
|
||||
sub.onNext(System.currentTimeMillis());
|
||||
Thread.sleep(interval);
|
||||
}
|
||||
} catch (final InterruptedException e) {}
|
||||
timeThread = null;
|
||||
});
|
||||
timeThread.start();
|
||||
running = true;
|
||||
}
|
||||
}
|
||||
|
||||
public static ObservableInterval create(final long l) {
|
||||
return new ObservableInterval(l);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onDisposed(final Subscriber<? super Long> sub) {
|
||||
super.onDisposed(sub);
|
||||
stopInterval();
|
||||
}
|
||||
}
|
@ -1,49 +0,0 @@
|
||||
package it.cavallium.warppi.flow;
|
||||
|
||||
import java.util.function.Function;
|
||||
|
||||
public class ObservableMap<T, U> extends Observable<U> {
|
||||
private final Observable<T> originalObservable;
|
||||
private final Function<T, U> mapAction;
|
||||
private volatile boolean initialized = false;
|
||||
private Disposable mapDisposable;
|
||||
|
||||
public ObservableMap(final Observable<T> originalObservable, final Function<T, U> mapAction) {
|
||||
super();
|
||||
this.originalObservable = originalObservable;
|
||||
this.mapAction = mapAction;
|
||||
}
|
||||
|
||||
private void initialize() {
|
||||
this.mapDisposable = originalObservable.subscribe((t) -> {
|
||||
for (final Subscriber<? super U> sub : subscribers)
|
||||
sub.onNext(mapAction.apply(t));;
|
||||
}, (e) -> {
|
||||
for (final Subscriber<? super U> sub : subscribers)
|
||||
sub.onError(e);;
|
||||
}, () -> {
|
||||
for (final Subscriber<? super U> sub : subscribers)
|
||||
sub.onComplete();;
|
||||
});
|
||||
}
|
||||
|
||||
private void chechInitialized() {
|
||||
if (!initialized) {
|
||||
initialized = true;
|
||||
initialize();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Disposable subscribe(final Subscriber<? super U> sub) {
|
||||
final Disposable disp = super.subscribe(sub);
|
||||
chechInitialized();
|
||||
return disp;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onDisposed(final Subscriber<? super U> sub) {
|
||||
super.onDisposed(sub);
|
||||
mapDisposable.dispose();
|
||||
}
|
||||
}
|
@ -1,59 +0,0 @@
|
||||
package it.cavallium.warppi.flow;
|
||||
|
||||
public class ObservableMerged<T> extends Observable<T> {
|
||||
private final Observable<T> originalObservableA;
|
||||
private final Observable<T> originalObservableB;
|
||||
private volatile boolean initialized = false;
|
||||
private Disposable mapDisposableA;
|
||||
private Disposable mapDisposableB;
|
||||
|
||||
public ObservableMerged(final Observable<T> originalObservableA, final Observable<T> originalObservableB) {
|
||||
super();
|
||||
this.originalObservableA = originalObservableA;
|
||||
this.originalObservableB = originalObservableB;
|
||||
}
|
||||
|
||||
private void initialize() {
|
||||
this.mapDisposableA = originalObservableA.subscribe((t) -> {
|
||||
for (final Subscriber<? super T> sub : subscribers)
|
||||
sub.onNext(t);;
|
||||
}, (e) -> {
|
||||
for (final Subscriber<? super T> sub : subscribers)
|
||||
sub.onError(e);;
|
||||
}, () -> {
|
||||
for (final Subscriber<? super T> sub : subscribers)
|
||||
sub.onComplete();;
|
||||
});
|
||||
this.mapDisposableB = originalObservableB.subscribe((t) -> {
|
||||
for (final Subscriber<? super T> sub : subscribers)
|
||||
sub.onNext(t);;
|
||||
}, (e) -> {
|
||||
for (final Subscriber<? super T> sub : subscribers)
|
||||
sub.onError(e);;
|
||||
}, () -> {
|
||||
for (final Subscriber<? super T> sub : subscribers)
|
||||
sub.onComplete();;
|
||||
});
|
||||
}
|
||||
|
||||
private void chechInitialized() {
|
||||
if (!initialized) {
|
||||
initialized = true;
|
||||
initialize();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Disposable subscribe(final Subscriber<? super T> sub) {
|
||||
final Disposable disp = super.subscribe(sub);
|
||||
chechInitialized();
|
||||
return disp;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onDisposed(final Subscriber<? super T> sub) {
|
||||
super.onDisposed(sub);
|
||||
this.mapDisposableA.dispose();
|
||||
this.mapDisposableB.dispose();
|
||||
}
|
||||
}
|
@ -1,5 +0,0 @@
|
||||
package it.cavallium.warppi.flow;
|
||||
|
||||
public interface ObservableSource<T> {
|
||||
void subscribe(Observer<? super T> observer);
|
||||
}
|
@ -1,76 +0,0 @@
|
||||
package it.cavallium.warppi.flow;
|
||||
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
public class ObservableZipped<T, U> extends Observable<Pair<T, U>> {
|
||||
private volatile boolean initialized = false;
|
||||
private final Observable<T> a;
|
||||
private final Observable<U> b;
|
||||
private Disposable disposableA;
|
||||
private Disposable disposableB;
|
||||
private volatile T lastA;
|
||||
private volatile U lastB;
|
||||
private volatile boolean didA;
|
||||
private volatile boolean didB;
|
||||
|
||||
public ObservableZipped(final Observable<T> a, final Observable<U> b) {
|
||||
super();
|
||||
this.a = a;
|
||||
this.b = b;
|
||||
}
|
||||
|
||||
private void initialize() {
|
||||
this.disposableA = a.subscribe((t) -> {
|
||||
lastA = t;
|
||||
didA = true;
|
||||
receivedNext();
|
||||
}, (e) -> {
|
||||
for (final Subscriber<? super Pair<T, U>> sub : subscribers)
|
||||
sub.onError(e);;
|
||||
}, () -> {
|
||||
for (final Subscriber<? super Pair<T, U>> sub : subscribers)
|
||||
sub.onComplete();;
|
||||
});
|
||||
this.disposableB = b.subscribe((t) -> {
|
||||
lastB = t;
|
||||
didB = true;
|
||||
receivedNext();
|
||||
}, (e) -> {
|
||||
for (final Subscriber<? super Pair<T, U>> sub : subscribers)
|
||||
sub.onError(e);;
|
||||
}, () -> {
|
||||
for (final Subscriber<? super Pair<T, U>> sub : subscribers)
|
||||
sub.onComplete();;
|
||||
});
|
||||
}
|
||||
|
||||
private void receivedNext() {
|
||||
if (didA && didB) {
|
||||
didA = false;
|
||||
didB = false;
|
||||
for (final Subscriber<? super Pair<T, U>> sub : subscribers)
|
||||
sub.onNext(Pair.of(lastA, lastB));;
|
||||
}
|
||||
}
|
||||
|
||||
private void chechInitialized() {
|
||||
if (!initialized) {
|
||||
initialized = true;
|
||||
initialize();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Disposable subscribe(final Subscriber<? super Pair<T, U>> sub) {
|
||||
final Disposable disp = super.subscribe(sub);
|
||||
chechInitialized();
|
||||
return disp;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onDisposed(final Subscriber<? super Pair<T, U>> sub) {
|
||||
super.onDisposed(sub);
|
||||
this.disposableA.dispose();
|
||||
this.disposableB.dispose();
|
||||
}
|
||||
}
|
@ -1,11 +0,0 @@
|
||||
package it.cavallium.warppi.flow;
|
||||
|
||||
public interface Observer<T> {
|
||||
void onComplete();
|
||||
|
||||
void onError(Throwable e);
|
||||
|
||||
void onNext(T t);
|
||||
|
||||
void onSubscribe(Disposable d);
|
||||
}
|
@ -1,62 +0,0 @@
|
||||
package it.cavallium.warppi.flow;
|
||||
|
||||
public class SimpleSubject<T> extends Subject<T> {
|
||||
|
||||
protected SimpleSubject() {}
|
||||
|
||||
public final static <T> SimpleSubject<T> create() {
|
||||
return new SimpleSubject<>();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onComplete() {
|
||||
for (final Subscriber<? super T> sub : subscribers)
|
||||
sub.onComplete();;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(final Throwable e) {
|
||||
for (final Subscriber<? super T> sub : subscribers)
|
||||
sub.onError(e);;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNext(final T t) {
|
||||
for (final Subscriber<? super T> sub : subscribers)
|
||||
sub.onNext(t);;
|
||||
}
|
||||
|
||||
@Override
|
||||
Throwable getThrowable() {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean hasComplete() {
|
||||
// TODO Auto-generated method stub
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean hasObservers() {
|
||||
// TODO Auto-generated method stub
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean hasThrowable() {
|
||||
// TODO Auto-generated method stub
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
Subject<T> toSerialized() {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onSubscribe(final Disposable d) {}
|
||||
|
||||
}
|
@ -1,41 +0,0 @@
|
||||
package it.cavallium.warppi.flow;
|
||||
|
||||
public abstract class Subject<T> extends Observable<T> implements Observer<T> {
|
||||
abstract Throwable getThrowable();
|
||||
|
||||
abstract boolean hasComplete();
|
||||
|
||||
abstract boolean hasObservers();
|
||||
|
||||
abstract boolean hasThrowable();
|
||||
|
||||
abstract Subject<T> toSerialized();
|
||||
|
||||
@Override
|
||||
public Disposable subscribe(final Action1<? super T> onNext) {
|
||||
return subscribe(createSubscriber(onNext));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Disposable subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError) {
|
||||
return subscribe(createSubscriber(onNext, onError));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Disposable subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError,
|
||||
final Action0 onCompl) {
|
||||
return subscribe(createSubscriber(onNext, onError, onCompl));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void subscribe(final Observer<? super T> obs) {
|
||||
subscribe(createSubscriber(obs));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Disposable subscribe(final Subscriber<? super T> sub) {
|
||||
final Disposable disp = super.subscribe(sub);
|
||||
onSubscribe(disp);
|
||||
return disp;
|
||||
}
|
||||
}
|
@ -1,11 +0,0 @@
|
||||
package it.cavallium.warppi.flow;
|
||||
|
||||
public interface Subscriber<T> {
|
||||
default void onComplete() {}
|
||||
|
||||
default void onError(final Throwable t) {}
|
||||
|
||||
void onNext(T t);
|
||||
|
||||
void onSubscribe(Subscription s);
|
||||
}
|
@ -1,7 +0,0 @@
|
||||
package it.cavallium.warppi.flow;
|
||||
|
||||
public interface Subscription {
|
||||
void cancel();
|
||||
|
||||
void request(long n);
|
||||
}
|
@ -1,135 +0,0 @@
|
||||
package it.cavallium.warppi.flow;
|
||||
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
public class TestFlow {
|
||||
public static void main(final String[] args) {
|
||||
try {
|
||||
|
||||
final BehaviorSubject<Float> subject0 = BehaviorSubject.create(0f);
|
||||
|
||||
final Disposable s00 = subject0.subscribe((val) -> {
|
||||
System.out.println(val);
|
||||
});
|
||||
Thread.sleep(100);
|
||||
subject0.onNext(1f);
|
||||
subject0.onNext(2f);
|
||||
subject0.onNext(3f);
|
||||
subject0.onNext(4f);
|
||||
subject0.onNext(5f);
|
||||
subject0.onNext(60f);
|
||||
s00.dispose();
|
||||
subject0.onNext(60f);
|
||||
subject0.onNext(7f);
|
||||
|
||||
subject0.onComplete();
|
||||
System.out.println("items sent.");
|
||||
|
||||
final Subject<Float> subject1 = BehaviorSubject.create(0f);
|
||||
|
||||
final Disposable s01 = subject1.map((val) -> val + 1).subscribe((val) -> {
|
||||
System.out.println(val);
|
||||
});
|
||||
Thread.sleep(100);
|
||||
subject1.onNext(1f);
|
||||
subject1.onNext(2f);
|
||||
subject1.onNext(3f);
|
||||
subject1.onNext(4f);
|
||||
subject1.onNext(5f);
|
||||
subject1.onNext(60f);
|
||||
s01.dispose();
|
||||
subject1.onNext(60f);
|
||||
subject1.onNext(7f);
|
||||
|
||||
subject1.onComplete();
|
||||
System.out.println("items sent.");
|
||||
|
||||
final BehaviorSubject<Float> subjectA = BehaviorSubject.create();
|
||||
final BehaviorSubject<Float> subjectB = BehaviorSubject.create();
|
||||
final Observable<Float> observable = Observable.merge(subjectA, subjectB);
|
||||
|
||||
final Disposable s1 = observable.subscribe((val) -> {
|
||||
System.out.println(val);
|
||||
});
|
||||
Thread.sleep(100);
|
||||
subjectA.onNext(1f);
|
||||
subjectA.onNext(2f);
|
||||
subjectA.onNext(3f);
|
||||
subjectA.onNext(4f);
|
||||
subjectA.onNext(5f);
|
||||
subjectB.onNext(60f);
|
||||
s1.dispose();
|
||||
subjectB.onNext(60f);
|
||||
subjectA.onNext(7f);
|
||||
|
||||
subjectB.onComplete();
|
||||
subjectA.onComplete();
|
||||
Thread.sleep(100);
|
||||
System.out.println("no more news subscribers left, closing publisher..");
|
||||
|
||||
final BehaviorSubject<Float> subjectC = BehaviorSubject.create();
|
||||
final BehaviorSubject<Float> subjectD = BehaviorSubject.create();
|
||||
final Observable<Pair<Float, Float>> observableCombined = Observable.combineLatest(subjectC, subjectD);
|
||||
System.out.println("Combined observable: " + observableCombined.toString());
|
||||
final Disposable s2 = observableCombined.subscribe((val) -> {
|
||||
System.out.println(val);
|
||||
});
|
||||
Thread.sleep(100);
|
||||
subjectC.onNext(1f);
|
||||
subjectC.onNext(2f);
|
||||
subjectC.onNext(3f);
|
||||
subjectC.onNext(4f);
|
||||
subjectC.onNext(5f);
|
||||
subjectD.onNext(60f);
|
||||
subjectD.onNext(60f);
|
||||
subjectC.onNext(7f);
|
||||
s2.dispose();
|
||||
|
||||
subjectD.onComplete();
|
||||
subjectC.onComplete();
|
||||
System.out.println("items sent.");
|
||||
|
||||
final ObservableInterval timA = ObservableInterval.create(100L);
|
||||
final Disposable d = timA.subscribe((t) -> {
|
||||
System.out.println(t);
|
||||
});
|
||||
|
||||
Thread.sleep(500);
|
||||
d.dispose();
|
||||
System.out.println("items sent.");
|
||||
|
||||
final ObservableInterval subjectE = ObservableInterval.create(100L);
|
||||
final BehaviorSubject<Float> subjectF = BehaviorSubject.create();
|
||||
final Observable<Pair<Long, Float>> observableZipped = Observable.zip(subjectE, subjectF);
|
||||
System.out.println("Zipped observable: " + observableZipped.toString());
|
||||
final Disposable s3 = observableZipped.subscribe((val) -> {
|
||||
System.out.println(val);
|
||||
});
|
||||
Thread.sleep(100);
|
||||
subjectF.onNext(1f);
|
||||
Thread.sleep(100);
|
||||
subjectF.onNext(2f);
|
||||
Thread.sleep(100);
|
||||
subjectF.onNext(3f);
|
||||
Thread.sleep(100);
|
||||
subjectF.onNext(4f);
|
||||
Thread.sleep(100);
|
||||
subjectF.onNext(5f);
|
||||
Thread.sleep(100);
|
||||
subjectF.onNext(60f);
|
||||
Thread.sleep(100);
|
||||
subjectF.onNext(60f);
|
||||
Thread.sleep(100);
|
||||
subjectF.onNext(7f);
|
||||
Thread.sleep(500);
|
||||
s3.dispose();
|
||||
|
||||
subjectF.onComplete();
|
||||
System.out.println("items sent.");
|
||||
|
||||
} catch (final Exception ex) {
|
||||
ex.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -1,21 +0,0 @@
|
||||
package it.cavallium.warppi.flow;
|
||||
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
|
||||
class Utils {
|
||||
|
||||
/**
|
||||
* Compatibility method for TeaVM
|
||||
* @param t
|
||||
* @param b
|
||||
*/
|
||||
public static void setThreadDaemon(Thread t, boolean b) {
|
||||
// Compatibility
|
||||
try {
|
||||
t.getClass().getMethod("setDaemon", Boolean.class).invoke(t, true);
|
||||
} catch (NoSuchMethodException | SecurityException | InvocationTargetException | IllegalAccessException | IllegalArgumentException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -1,35 +0,0 @@
|
||||
package it.cavallium.warppi;
|
||||
|
||||
import junit.framework.Assert;
|
||||
import junit.framework.Test;
|
||||
import junit.framework.TestCase;
|
||||
import junit.framework.TestSuite;
|
||||
|
||||
/**
|
||||
* Unit test for simple App.
|
||||
*/
|
||||
public class AppTest extends TestCase {
|
||||
/**
|
||||
* Create the test case
|
||||
*
|
||||
* @param testName
|
||||
* name of the test case
|
||||
*/
|
||||
public AppTest(final String testName) {
|
||||
super(testName);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the suite of tests being tested
|
||||
*/
|
||||
public static Test suite() {
|
||||
return new TestSuite(AppTest.class);
|
||||
}
|
||||
|
||||
/**
|
||||
* Rigourous Test :-)
|
||||
*/
|
||||
public void testApp() {
|
||||
Assert.assertTrue(true);
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user