如何锁定线程直到 Observable 完成
How to lock the thread until the Observable be complete
我正在测试一个场景,我想发送一个事件,并观察消费者何时完成处理以继续流程,即当我触发事件时,我需要阻塞主线程直到结束消费者处理,使用rxJava Observable,我没有成功锁定主线程等待可观察结果。
我的制作人
@Service
public class Producer {
private MessageChannel output;
@Autowired
private Consumer consumer;
@Autowired
public Producer(Processor processor) {
this.output = processor.output();
}
public void send(String event) {
System.out.println("SENDING EVENT...");
output.send(MessageBuilder.withPayload(event).build());
//Observable<Boolean> obs = consumer.execute();
//obs.subscribe();
//Blocking process
BlockingObservable.from(consumer.execute()).subscribe();
//Continue to flow
System.out.println("EVENT PROCESSED...");
}
}
我的消费者
@Service
public class Consumer {
@StreamListener(target = Processor.INPUT)
public void receiver(@Payload String event){
System.out.println("EVENT RECEIVED, PROCESSING...");
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
execute();
}
public Observable<Boolean> execute() {
return Observable.<Boolean>create(emitter -> {
try {
System.out.println("EVENT STILL PROCESSING...");
emitter.onNext(Boolean.TRUE);
} catch (Exception e) {
emitter.onError(new RuntimeException("ERROR"));
}
emitter.onCompleted();
});
}
}
您可以使用 BlockingObservable.toFuture(consumer.execute()).get()
而不是 BlockingObservable.from(consumer.execute()).subscribe()
来阻塞线程。
需要使用算子toBlockig(等待消费者消费)+single来取值
@Test
public void observableEvolveAndReturnToStringValue() {
assertTrue(Observable.just(10)
.map(String::valueOf)
.toBlocking()
.single()
.equals("10"));
}
我正在测试一个场景,我想发送一个事件,并观察消费者何时完成处理以继续流程,即当我触发事件时,我需要阻塞主线程直到结束消费者处理,使用rxJava Observable,我没有成功锁定主线程等待可观察结果。
我的制作人
@Service
public class Producer {
private MessageChannel output;
@Autowired
private Consumer consumer;
@Autowired
public Producer(Processor processor) {
this.output = processor.output();
}
public void send(String event) {
System.out.println("SENDING EVENT...");
output.send(MessageBuilder.withPayload(event).build());
//Observable<Boolean> obs = consumer.execute();
//obs.subscribe();
//Blocking process
BlockingObservable.from(consumer.execute()).subscribe();
//Continue to flow
System.out.println("EVENT PROCESSED...");
}
}
我的消费者
@Service
public class Consumer {
@StreamListener(target = Processor.INPUT)
public void receiver(@Payload String event){
System.out.println("EVENT RECEIVED, PROCESSING...");
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
execute();
}
public Observable<Boolean> execute() {
return Observable.<Boolean>create(emitter -> {
try {
System.out.println("EVENT STILL PROCESSING...");
emitter.onNext(Boolean.TRUE);
} catch (Exception e) {
emitter.onError(new RuntimeException("ERROR"));
}
emitter.onCompleted();
});
}
}
您可以使用 BlockingObservable.toFuture(consumer.execute()).get()
而不是 BlockingObservable.from(consumer.execute()).subscribe()
来阻塞线程。
需要使用算子toBlockig(等待消费者消费)+single来取值
@Test
public void observableEvolveAndReturnToStringValue() {
assertTrue(Observable.just(10)
.map(String::valueOf)
.toBlocking()
.single()
.equals("10"));
}