定时器RXjava:控制系统函数

Timer RXjava: control system function

我对 RXjava 有疑问。 我的系统有三个 observable,它们将信号(是温度传感器)发送到应用函数 zip 的侦听器,然后计算接收到的值的平均值。

我必须实现一个功能,根据参数 "t",平均温度超出范围几毫秒后,系统会发出异常信号。

例如:

a = anomaly
x = average value
- = Second

if t = 3:

x-x-x-x-a-a-x => ok
x-x-x-a-a-a-x => ko

我的代码在这里:

public class Example extends Thread {

@override
public void run() {
    /*Create 3 observable*/
    Observable<Double> alfa = Observable.create((
            Subscriber<? super Double> subscriber) -> {
        new ObservableTempStream().start();
    });

    Observable<Double> bravo = Observable.create((
            Subscriber<? super Double> subscriber) -> {
        new ObservableTempStream().start();
    });

    Observable<Double> charlie = Observable.create((
            Subscriber<? super Double> subscriber) -> {
        new ObservableTempStream().start();
    });

    /*Create 1 observable that apply func avg with zip*/
    ConnectableObservable<Double> averageTempStream = Observable.zip(
            alfa, bravo, charlie,
            (Double a, Double b, Double c) -> ((a + b + c) / 3)).publish();

    averageTempStream.connect();


    averageTempStream.subscribe((Double v) -> {

            if ((v) < (averageTempSensors - threshold)
                || (v) > (averageTempSensors + threshold)) {
                System.out.println("Value out of threshold:  " + v);
            } else {
                System.out.println("Value avg it's ok: " + v);
            }
        }, (Throwable t) -> {
            System.out.println("error  " + t);
        }, () -> {
            System.out.println("Completed");
        });
}

}

可以应用什么策略来解决这个问题?
有没有可以和异步流一起使用的函数?

在我的代码中:
每当平均值超出范围时,我都会报告错误的存在(实际上,其中一个传感器发送了尖峰信号)。 相反,只有当平均值超出范围超过 "t" 秒时,我才必须发出错误信号。

非常感谢

这个怎么样:

averageTempStream.map((Double v) -> {    // check whether the value is ok
    (v > averageTempSensors - threshold) && (v < averageTempSensors + threshold)
})
.distinctUntilChanged()                  // ignore subsequent identical values
                                         // (e. g. "x-x-x-x-a-a-x" becomes "x- - - -a- -x")
.debounce(t, TimeUnit.SECONDS)           // only emit a value if it not followed by
                                         // another one within t seconds
                                         // (e. g. "x- - - -a- -x" becomes " - -x- - - - - -x",
                                         // because the final x comes within t seconds of the a and thus prevents it from being passed down the chain)
.subscribe((Boolean ok) -> {
    if (ok) {
        System.out.println("Value avg is ok!");
    } else {
        System.out.println("Value out of threshold!");
    }
}, (Throwable t) -> {
    System.out.println("error  " + t);
}, () -> {
    System.out.println("Completed");
});

请注意,当然,debounce 发出的所有项目都延迟了 t 秒(否则它怎么知道在该间隔内没有更新的项目出现?) - 所以 ok 信号是也耽误了您可以通过 (1) 过滤上述流以删除所有 ok 信号和 (2) 将其与仅包含(未延迟的)ok 信号的流合并来克服这个问题。