Netflix Hystrix - HystrixObservableCommand 异步 运行
Netflix Hystrix - HystrixObservableCommand asynchronous run
我有一些基本项目对某些外部资源有四次调用,在当前版本 运行s 中同步调用。我想要实现的是将该调用包装到 HystrixObservableCommand
中,然后异步调用它。
据我所知,在 HystrixObservableCommand
对象上调用 .observe()
后,应该立即异步调用包装逻辑。但是我做错了什么,因为它是同步工作的。
在示例代码中,输出是Void
,因为我对输出不感兴趣(暂时)。这也是为什么我没有将 Observable 分配给任何对象,只是调用 constructor.observe()
。
@Component
public class LoggerProducer {
private static final Logger LOGGER = Logger.getLogger(LoggerProducer.class);
@Autowired
SimpMessagingTemplate template;
private void push(Iterable<Message> messages, String topic) throws Exception {
template.convertAndSend("/messages/"+topic, messages);
}
public void splitAndPush(Iterable<Message> messages) {
Map<MessageTypeEnum, List<Message>> groupByMessageType = StreamSupport.stream(messages.spliterator(), true)
.collect(Collectors.groupingBy(Message::getType));
//should be async - it's not
new CommandPushToBrowser(groupByMessageType.get(MessageTypeEnum.INFO),
MessageTypeEnum.INFO.toString().toLowerCase()).observe();
new CommandPushToBrowser(groupByMessageType.get(MessageTypeEnum.WARN),
MessageTypeEnum.WARN.toString().toLowerCase()).observe();
new CommandPushToBrowser(groupByMessageType.get(MessageTypeEnum.ERROR),
MessageTypeEnum.ERROR.toString().toLowerCase()).observe();
}
class CommandPushToBrowser extends HystrixObservableCommand<Void> {
private Iterable<Message> messages;
private String messageTypeName;
public CommandPushToBrowser(Iterable<Message> messages, String messageTypeName) {
super(HystrixCommandGroupKey.Factory.asKey("Messages"));
this.messageTypeName = messageTypeName;
this.messages = messages;
}
@Override
protected Observable<Void> construct() {
return Observable.create(new Observable.OnSubscribe<Void>() {
@Override
public void call(Subscriber<? super Void> observer) {
try {
for (int i = 0 ; i < 50 ; i ++ ) {
LOGGER.info("Count: " + i + " messageType " + messageTypeName);
}
if (null != messages) {
push(messages, messageTypeName);
LOGGER.info("Message type: " + messageTypeName + " pushed: " + messages);
}
if (!observer.isUnsubscribed()) {
observer.onCompleted();
}
} catch (Exception e) {
e.printStackTrace();
observer.onError(e);
}
}
});
}
}
}
那里有一些纯粹的"test"代码片段,因为我试图找出问题所在,只是忽略逻辑,主要重点是使其运行与[=13=异步].我知道我可以通过标准 HystrixCommand
实现这一目标,但这不是目标。
希望有人帮忙:)
此致,
找到答案:
"Observables do not add concurrency automatically. If you are modeling
synchronous, blocking execution with an Observable, then they will
execute synchronously.
You can easily make it asynchronous by scheduling on a thread using
subscribeOn(Schedulers.io()). Here is a simply example for wrapping a
blocking call with an Observable:
https://speakerdeck.com/benjchristensen/applying-reactive-programming-with-rxjava-at-goto-chicago-2015?slide=33
However, if you are wrapping blocking calls, you should just stick
with using HystrixCommand as that’s what it’s built for and it
defaults to running everything in a separate thread. Using
HystrixCommand.observe() will give you the concurrent, async
composition you’re looking for.
HystrixObservableCommand is intended for wrapping around async,
non-blocking Observables that don’t need extra threads."
-- Ben Christensen - Netflix Edge Engineering
来源:https://groups.google.com/forum/#!topic/hystrixoss/g7ZLIudE8Rs
我有一些基本项目对某些外部资源有四次调用,在当前版本 运行s 中同步调用。我想要实现的是将该调用包装到 HystrixObservableCommand
中,然后异步调用它。
据我所知,在 HystrixObservableCommand
对象上调用 .observe()
后,应该立即异步调用包装逻辑。但是我做错了什么,因为它是同步工作的。
在示例代码中,输出是Void
,因为我对输出不感兴趣(暂时)。这也是为什么我没有将 Observable 分配给任何对象,只是调用 constructor.observe()
。
@Component
public class LoggerProducer {
private static final Logger LOGGER = Logger.getLogger(LoggerProducer.class);
@Autowired
SimpMessagingTemplate template;
private void push(Iterable<Message> messages, String topic) throws Exception {
template.convertAndSend("/messages/"+topic, messages);
}
public void splitAndPush(Iterable<Message> messages) {
Map<MessageTypeEnum, List<Message>> groupByMessageType = StreamSupport.stream(messages.spliterator(), true)
.collect(Collectors.groupingBy(Message::getType));
//should be async - it's not
new CommandPushToBrowser(groupByMessageType.get(MessageTypeEnum.INFO),
MessageTypeEnum.INFO.toString().toLowerCase()).observe();
new CommandPushToBrowser(groupByMessageType.get(MessageTypeEnum.WARN),
MessageTypeEnum.WARN.toString().toLowerCase()).observe();
new CommandPushToBrowser(groupByMessageType.get(MessageTypeEnum.ERROR),
MessageTypeEnum.ERROR.toString().toLowerCase()).observe();
}
class CommandPushToBrowser extends HystrixObservableCommand<Void> {
private Iterable<Message> messages;
private String messageTypeName;
public CommandPushToBrowser(Iterable<Message> messages, String messageTypeName) {
super(HystrixCommandGroupKey.Factory.asKey("Messages"));
this.messageTypeName = messageTypeName;
this.messages = messages;
}
@Override
protected Observable<Void> construct() {
return Observable.create(new Observable.OnSubscribe<Void>() {
@Override
public void call(Subscriber<? super Void> observer) {
try {
for (int i = 0 ; i < 50 ; i ++ ) {
LOGGER.info("Count: " + i + " messageType " + messageTypeName);
}
if (null != messages) {
push(messages, messageTypeName);
LOGGER.info("Message type: " + messageTypeName + " pushed: " + messages);
}
if (!observer.isUnsubscribed()) {
observer.onCompleted();
}
} catch (Exception e) {
e.printStackTrace();
observer.onError(e);
}
}
});
}
}
}
那里有一些纯粹的"test"代码片段,因为我试图找出问题所在,只是忽略逻辑,主要重点是使其运行与[=13=异步].我知道我可以通过标准 HystrixCommand
实现这一目标,但这不是目标。
希望有人帮忙:) 此致,
找到答案:
"Observables do not add concurrency automatically. If you are modeling synchronous, blocking execution with an Observable, then they will execute synchronously.
You can easily make it asynchronous by scheduling on a thread using subscribeOn(Schedulers.io()). Here is a simply example for wrapping a blocking call with an Observable: https://speakerdeck.com/benjchristensen/applying-reactive-programming-with-rxjava-at-goto-chicago-2015?slide=33
However, if you are wrapping blocking calls, you should just stick with using HystrixCommand as that’s what it’s built for and it defaults to running everything in a separate thread. Using HystrixCommand.observe() will give you the concurrent, async composition you’re looking for.
HystrixObservableCommand is intended for wrapping around async, non-blocking Observables that don’t need extra threads."
-- Ben Christensen - Netflix Edge Engineering
来源:https://groups.google.com/forum/#!topic/hystrixoss/g7ZLIudE8Rs