定时器RXjava:控制系统函数
Timer RXjava: control system function
我对 RXjava 有疑问。
我的系统有三个 observable,它们将信号(是温度传感器)发送到应用函数 zip 的侦听器,然后计算接收到的值的平均值。
我必须实现一个功能,根据参数 "t",平均温度超出范围几毫秒后,系统会发出异常信号。
例如:
a = anomaly
x = average value
- = Second
if t = 3:
x-x-x-x-a-a-x => ok
x-x-x-a-a-a-x => ko
我的代码在这里:
public class Example extends Thread {
@override
public void run() {
/*Create 3 observable*/
Observable<Double> alfa = Observable.create((
Subscriber<? super Double> subscriber) -> {
new ObservableTempStream().start();
});
Observable<Double> bravo = Observable.create((
Subscriber<? super Double> subscriber) -> {
new ObservableTempStream().start();
});
Observable<Double> charlie = Observable.create((
Subscriber<? super Double> subscriber) -> {
new ObservableTempStream().start();
});
/*Create 1 observable that apply func avg with zip*/
ConnectableObservable<Double> averageTempStream = Observable.zip(
alfa, bravo, charlie,
(Double a, Double b, Double c) -> ((a + b + c) / 3)).publish();
averageTempStream.connect();
averageTempStream.subscribe((Double v) -> {
if ((v) < (averageTempSensors - threshold)
|| (v) > (averageTempSensors + threshold)) {
System.out.println("Value out of threshold: " + v);
} else {
System.out.println("Value avg it's ok: " + v);
}
}, (Throwable t) -> {
System.out.println("error " + t);
}, () -> {
System.out.println("Completed");
});
}
}
可以应用什么策略来解决这个问题?
有没有可以和异步流一起使用的函数?
在我的代码中:
每当平均值超出范围时,我都会报告错误的存在(实际上,其中一个传感器发送了尖峰信号)。
相反,只有当平均值超出范围超过 "t" 秒时,我才必须发出错误信号。
非常感谢
这个怎么样:
averageTempStream.map((Double v) -> { // check whether the value is ok
(v > averageTempSensors - threshold) && (v < averageTempSensors + threshold)
})
.distinctUntilChanged() // ignore subsequent identical values
// (e. g. "x-x-x-x-a-a-x" becomes "x- - - -a- -x")
.debounce(t, TimeUnit.SECONDS) // only emit a value if it not followed by
// another one within t seconds
// (e. g. "x- - - -a- -x" becomes " - -x- - - - - -x",
// because the final x comes within t seconds of the a and thus prevents it from being passed down the chain)
.subscribe((Boolean ok) -> {
if (ok) {
System.out.println("Value avg is ok!");
} else {
System.out.println("Value out of threshold!");
}
}, (Throwable t) -> {
System.out.println("error " + t);
}, () -> {
System.out.println("Completed");
});
请注意,当然,debounce
发出的所有项目都延迟了 t 秒(否则它怎么知道在该间隔内没有更新的项目出现?) - 所以 ok 信号是也耽误了您可以通过 (1) 过滤上述流以删除所有 ok 信号和 (2) 将其与仅包含(未延迟的)ok 信号的流合并来克服这个问题。
我对 RXjava 有疑问。 我的系统有三个 observable,它们将信号(是温度传感器)发送到应用函数 zip 的侦听器,然后计算接收到的值的平均值。
我必须实现一个功能,根据参数 "t",平均温度超出范围几毫秒后,系统会发出异常信号。
例如:
a = anomaly
x = average value
- = Second
if t = 3:
x-x-x-x-a-a-x => ok
x-x-x-a-a-a-x => ko
我的代码在这里:
public class Example extends Thread {
@override
public void run() {
/*Create 3 observable*/
Observable<Double> alfa = Observable.create((
Subscriber<? super Double> subscriber) -> {
new ObservableTempStream().start();
});
Observable<Double> bravo = Observable.create((
Subscriber<? super Double> subscriber) -> {
new ObservableTempStream().start();
});
Observable<Double> charlie = Observable.create((
Subscriber<? super Double> subscriber) -> {
new ObservableTempStream().start();
});
/*Create 1 observable that apply func avg with zip*/
ConnectableObservable<Double> averageTempStream = Observable.zip(
alfa, bravo, charlie,
(Double a, Double b, Double c) -> ((a + b + c) / 3)).publish();
averageTempStream.connect();
averageTempStream.subscribe((Double v) -> {
if ((v) < (averageTempSensors - threshold)
|| (v) > (averageTempSensors + threshold)) {
System.out.println("Value out of threshold: " + v);
} else {
System.out.println("Value avg it's ok: " + v);
}
}, (Throwable t) -> {
System.out.println("error " + t);
}, () -> {
System.out.println("Completed");
});
}
}
可以应用什么策略来解决这个问题?
有没有可以和异步流一起使用的函数?
在我的代码中:
每当平均值超出范围时,我都会报告错误的存在(实际上,其中一个传感器发送了尖峰信号)。
相反,只有当平均值超出范围超过 "t" 秒时,我才必须发出错误信号。
非常感谢
这个怎么样:
averageTempStream.map((Double v) -> { // check whether the value is ok
(v > averageTempSensors - threshold) && (v < averageTempSensors + threshold)
})
.distinctUntilChanged() // ignore subsequent identical values
// (e. g. "x-x-x-x-a-a-x" becomes "x- - - -a- -x")
.debounce(t, TimeUnit.SECONDS) // only emit a value if it not followed by
// another one within t seconds
// (e. g. "x- - - -a- -x" becomes " - -x- - - - - -x",
// because the final x comes within t seconds of the a and thus prevents it from being passed down the chain)
.subscribe((Boolean ok) -> {
if (ok) {
System.out.println("Value avg is ok!");
} else {
System.out.println("Value out of threshold!");
}
}, (Throwable t) -> {
System.out.println("error " + t);
}, () -> {
System.out.println("Completed");
});
请注意,当然,debounce
发出的所有项目都延迟了 t 秒(否则它怎么知道在该间隔内没有更新的项目出现?) - 所以 ok 信号是也耽误了您可以通过 (1) 过滤上述流以删除所有 ok 信号和 (2) 将其与仅包含(未延迟的)ok 信号的流合并来克服这个问题。