RxJava 去抖任意值
RxJava debounce by arbitrary value
仍在寻找正确使用不同 Rx* 运算符的方法并偶然发现了以下问题:
我收集了以下类型的模型:
class Model {
final long timestamp;
final Object data;
public Model(long timestamp, Object data) {
this.timestamp = timestamp;
this.data = data;
}
}
此集合按升序排序(按时间戳排序)。
我的目标是按 "sequences" 对它们进行分组。 "Sequence" - 是元素序列,其中每个元素都非常接近其邻居:
----A-B-C-----D-E-F---H-I--->
在这种情况下,我有 3 个 "sequences"。轴上的位置由模型的 timestamp
属性(不是发射时间)定义。形成序列的最大距离应该是可配置的。
或者让我们举个更真实的例子:
List<Model> models = new ArrayList<Model>(10) {{
add(new Model(0, null));
add(new Model(5, null));
add(new Model(10, null));
add(new Model(100, null));
add(new Model(108, null));
add(new Model(111, null));
add(new Model(115, null));
add(new Model(200, null));
add(new Model(201, null));
add(new Model(202, null));
}};
在这种情况下,对于 10 毫秒的最大距离,我会得到 3 个序列 - (0,5,10) , (100,108,111,115) , (200,201,202)
这个逻辑和debounce
操作符很相似。但不是实时去抖动,我需要通过一些自定义去抖动 属性.
如果时间戳代表发射时间,我会这样做:
List<Model> models = new ArrayList<Model>(10) {{
add(new Model(0, null));
add(new Model(5, null));
add(new Model(10, null));
add(new Model(100, null));
add(new Model(108, null));
add(new Model(111, null));
add(new Model(115, null));
add(new Model(200, null));
add(new Model(201, null));
add(new Model(202, null));
}};
Observable<Model> modelsObservable = Observable.from(models).share();
modelsObservable.buffer(modelsObservable.debounce(10, TimeUnit.MILLISECONDS))
.subscribe(group -> {
//this is one of my groups
});
它不一定需要去抖动 - 我也在看 groupBy
运算符,但我无法找出正确的分组标准..
有点不合常规,但您可以在这里使用 TestScheduler
,根据数据值安排值发射,然后使用此调度程序的去抖动技巧并提前虚拟时间。
TestScheduler s = new TestScheduler();
Scheduler.Worker w = s.createWorker();
PublishSubject<Object> subject = PublishSubject.create();
for (Model m : model) {
w.schedule(() -> subject.onNext(m.data),
m.timestamp, TimeUnit.MILLISECONDS);
}
subject.buffer(subject.debounce(10, TimeUnit.MILLISECONDS, s))
.subscribe(list -> ...);
s.advanceTimeBy(Long.MAX_VALUE / 2, TimeUnit.MILLISECONDS);
w.unsubscribe();
(曾尝试在 RxJava 中实现虚拟时间调度程序,但讨论被放弃,提议的实现被拒绝。)
我不会 fiddle 使用调度程序,但会利用 Buffer/Window(取决于您是否需要下游可观察对象或集合)和扫描。
在 Rx.Net 中,您可以通过以下方式实现它:
var models = new[] { 0, 5, 10, 100, 108, 111, 115, 200, 201, 202 }
.ToObservable();
var enrichedModels = models.Scan(
new { Current = -1, Prev = -1 },
(acc, cur) => new { Current = cur, Prev = acc.Current })
.Skip(1).Publish();
enrichedModels.Buffer(() => enrichedModels.SkipWhile(em => em.Current < em.Prev + 10))
.Select(seq => seq.Select(em => em.Prev))
.Subscribe(seq =>
{
Console.WriteLine(String.Join(",", seq));
});
enrichedModels.Connect();
结果:
0,5,10
100,108,111,115
200,201
Publish/Connect 如果你的 source observable 很热,可能会被跳过。
rx-java 拥有相同的运算符,但不是匿名类型,我想它们可以用元组或具体的 class.
代替
仍在寻找正确使用不同 Rx* 运算符的方法并偶然发现了以下问题:
我收集了以下类型的模型:
class Model {
final long timestamp;
final Object data;
public Model(long timestamp, Object data) {
this.timestamp = timestamp;
this.data = data;
}
}
此集合按升序排序(按时间戳排序)。
我的目标是按 "sequences" 对它们进行分组。 "Sequence" - 是元素序列,其中每个元素都非常接近其邻居:
----A-B-C-----D-E-F---H-I--->
在这种情况下,我有 3 个 "sequences"。轴上的位置由模型的 timestamp
属性(不是发射时间)定义。形成序列的最大距离应该是可配置的。
或者让我们举个更真实的例子:
List<Model> models = new ArrayList<Model>(10) {{
add(new Model(0, null));
add(new Model(5, null));
add(new Model(10, null));
add(new Model(100, null));
add(new Model(108, null));
add(new Model(111, null));
add(new Model(115, null));
add(new Model(200, null));
add(new Model(201, null));
add(new Model(202, null));
}};
在这种情况下,对于 10 毫秒的最大距离,我会得到 3 个序列 - (0,5,10) , (100,108,111,115) , (200,201,202)
这个逻辑和debounce
操作符很相似。但不是实时去抖动,我需要通过一些自定义去抖动 属性.
如果时间戳代表发射时间,我会这样做:
List<Model> models = new ArrayList<Model>(10) {{
add(new Model(0, null));
add(new Model(5, null));
add(new Model(10, null));
add(new Model(100, null));
add(new Model(108, null));
add(new Model(111, null));
add(new Model(115, null));
add(new Model(200, null));
add(new Model(201, null));
add(new Model(202, null));
}};
Observable<Model> modelsObservable = Observable.from(models).share();
modelsObservable.buffer(modelsObservable.debounce(10, TimeUnit.MILLISECONDS))
.subscribe(group -> {
//this is one of my groups
});
它不一定需要去抖动 - 我也在看 groupBy
运算符,但我无法找出正确的分组标准..
有点不合常规,但您可以在这里使用 TestScheduler
,根据数据值安排值发射,然后使用此调度程序的去抖动技巧并提前虚拟时间。
TestScheduler s = new TestScheduler();
Scheduler.Worker w = s.createWorker();
PublishSubject<Object> subject = PublishSubject.create();
for (Model m : model) {
w.schedule(() -> subject.onNext(m.data),
m.timestamp, TimeUnit.MILLISECONDS);
}
subject.buffer(subject.debounce(10, TimeUnit.MILLISECONDS, s))
.subscribe(list -> ...);
s.advanceTimeBy(Long.MAX_VALUE / 2, TimeUnit.MILLISECONDS);
w.unsubscribe();
(曾尝试在 RxJava 中实现虚拟时间调度程序,但讨论被放弃,提议的实现被拒绝。)
我不会 fiddle 使用调度程序,但会利用 Buffer/Window(取决于您是否需要下游可观察对象或集合)和扫描。
在 Rx.Net 中,您可以通过以下方式实现它:
var models = new[] { 0, 5, 10, 100, 108, 111, 115, 200, 201, 202 }
.ToObservable();
var enrichedModels = models.Scan(
new { Current = -1, Prev = -1 },
(acc, cur) => new { Current = cur, Prev = acc.Current })
.Skip(1).Publish();
enrichedModels.Buffer(() => enrichedModels.SkipWhile(em => em.Current < em.Prev + 10))
.Select(seq => seq.Select(em => em.Prev))
.Subscribe(seq =>
{
Console.WriteLine(String.Join(",", seq));
});
enrichedModels.Connect();
结果:
0,5,10
100,108,111,115
200,201
Publish/Connect 如果你的 source observable 很热,可能会被跳过。 rx-java 拥有相同的运算符,但不是匿名类型,我想它们可以用元组或具体的 class.
代替