将 API 回调绑定到 RxJava Observable
Binding an API callback to an RxJava Observable
我正在尝试制作一个响应式应用程序,它在单独的线程上侦听网络套接字以获取价格,但对如何构建 Observable
有点困惑。我拥有的大部分接口都受到我正在使用的 API 的限制,因此无法更改。我在下面提取了我想做的测试,但我看不到如何填写 getPriceReactive()
方法的主体,以便订户在控制台上打印价格(请参阅评论代码)。
public class PriceObservableTest {
// This interface is defined externally and used by the API
private interface ITickHandler {
void priceReceived(double price);
}
// Stores the price (currently just one double for illustration)
private class Tick {
double price = Double.NaN;
}
// Implementation of handler called by API when it receives a price
private class TickHandler implements ITickHandler {
private final Tick tick;
TickHandler() { this.tick = new Tick(); }
@Override public void priceReceived(double x) { tick.price = x; }
}
// This class emulates the API delivering prices from the socket
private class PriceSource {
private final Thread thread;
PriceSource(final ITickHandler handler) {
thread = new Thread(new Runnable() {
final Random r = new Random();
@Override public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(100);
handler.priceReceived(r.nextDouble() * 100);
} catch (InterruptedException e) {
break;
}
}
System.out.println("Price thread closed");
}
});
}
void subscribe() { thread.start(); }
void unsubscribe() { thread.interrupt(); }
}
@Test
public void simpleTest() throws Exception {
final ITickHandler handler = new TickHandler();
// Simulate some prices received periodically from a socket
PriceSource prices = new PriceSource(handler);
Observable<Tick> reactive = getPriceReactive(handler);
reactive.subscribe(new Subscriber<Tick>() {
@Override public void onCompleted() { }
@Override public void onError(Throwable e) { }
@Override public void onNext(Tick tick) {
System.out.println("Received price: " + tick.price);
}});
// Observe prices for 1 second. The subscriber should print them to console
prices.subscribe();
Thread.sleep(1000);
prices.unsubscribe();
}
// Returns an observable that reacts to price changes
private Observable<Tick> getPriceReactive(ITickHandler handler) {
return Observable.create(new Observable.OnSubscribe<Tick>() {
@Override public void call(Subscriber<? super Tick> subscriber) {
// How to call subscriber.onNext() whenever
// priceReceived() is called with a new price?
}
});
}
}
每当 API 调用 priceReceived()
时,不知何故需要调用 subscriber.onNext()
,但我不太明白如何实现这一点。当然,我可以在 TickHandler
中存储对订户的引用,但这种做法违背了 Observable
的目的,不是吗?
在 ITickHandler
实施中过渡到 Observable
。您控制的不是订阅者,而是发布者
private class TickHandler implements ITickHandler {
private final Tick tick;
private final PublishSubject<Tick> priceSubject;
TickHandler() {
this.tick = new Tick();
this.priceSubject = PublishSubject.create();
}
@Override public void priceReceived(double x)
{
tick.price = x;
priceSubject.onNext(tick);
}
public Observable<Tick> priceReceivedObservable()
{
return priceSubject.asObservable();
}
}
您可以在测试中使用它,例如:
final ITickHandler handler = new TickHandler();
PriceSource prices = new PriceSource(handler);
handler.priceReceivedObservable()
.subscribe(new Subscriber<Tick>() {
@Override public void onCompleted() { }
@Override public void onError(Throwable e) { }
@Override public void onNext(Tick tick) {
System.out.println("Received price: " + tick.price);
}});
我警告你,它没有经过测试,因为我没有做很多 Java :)
我正在尝试制作一个响应式应用程序,它在单独的线程上侦听网络套接字以获取价格,但对如何构建 Observable
有点困惑。我拥有的大部分接口都受到我正在使用的 API 的限制,因此无法更改。我在下面提取了我想做的测试,但我看不到如何填写 getPriceReactive()
方法的主体,以便订户在控制台上打印价格(请参阅评论代码)。
public class PriceObservableTest {
// This interface is defined externally and used by the API
private interface ITickHandler {
void priceReceived(double price);
}
// Stores the price (currently just one double for illustration)
private class Tick {
double price = Double.NaN;
}
// Implementation of handler called by API when it receives a price
private class TickHandler implements ITickHandler {
private final Tick tick;
TickHandler() { this.tick = new Tick(); }
@Override public void priceReceived(double x) { tick.price = x; }
}
// This class emulates the API delivering prices from the socket
private class PriceSource {
private final Thread thread;
PriceSource(final ITickHandler handler) {
thread = new Thread(new Runnable() {
final Random r = new Random();
@Override public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(100);
handler.priceReceived(r.nextDouble() * 100);
} catch (InterruptedException e) {
break;
}
}
System.out.println("Price thread closed");
}
});
}
void subscribe() { thread.start(); }
void unsubscribe() { thread.interrupt(); }
}
@Test
public void simpleTest() throws Exception {
final ITickHandler handler = new TickHandler();
// Simulate some prices received periodically from a socket
PriceSource prices = new PriceSource(handler);
Observable<Tick> reactive = getPriceReactive(handler);
reactive.subscribe(new Subscriber<Tick>() {
@Override public void onCompleted() { }
@Override public void onError(Throwable e) { }
@Override public void onNext(Tick tick) {
System.out.println("Received price: " + tick.price);
}});
// Observe prices for 1 second. The subscriber should print them to console
prices.subscribe();
Thread.sleep(1000);
prices.unsubscribe();
}
// Returns an observable that reacts to price changes
private Observable<Tick> getPriceReactive(ITickHandler handler) {
return Observable.create(new Observable.OnSubscribe<Tick>() {
@Override public void call(Subscriber<? super Tick> subscriber) {
// How to call subscriber.onNext() whenever
// priceReceived() is called with a new price?
}
});
}
}
每当 API 调用 priceReceived()
时,不知何故需要调用 subscriber.onNext()
,但我不太明白如何实现这一点。当然,我可以在 TickHandler
中存储对订户的引用,但这种做法违背了 Observable
的目的,不是吗?
在 ITickHandler
实施中过渡到 Observable
。您控制的不是订阅者,而是发布者
private class TickHandler implements ITickHandler {
private final Tick tick;
private final PublishSubject<Tick> priceSubject;
TickHandler() {
this.tick = new Tick();
this.priceSubject = PublishSubject.create();
}
@Override public void priceReceived(double x)
{
tick.price = x;
priceSubject.onNext(tick);
}
public Observable<Tick> priceReceivedObservable()
{
return priceSubject.asObservable();
}
}
您可以在测试中使用它,例如:
final ITickHandler handler = new TickHandler();
PriceSource prices = new PriceSource(handler);
handler.priceReceivedObservable()
.subscribe(new Subscriber<Tick>() {
@Override public void onCompleted() { }
@Override public void onError(Throwable e) { }
@Override public void onNext(Tick tick) {
System.out.println("Received price: " + tick.price);
}});
我警告你,它没有经过测试,因为我没有做很多 Java :)