自第一个新组元素以来超时的 Rx 缓冲区
Rx Buffer with timeout since first new group element
对 Rx 世界还很陌生,我需要实现以下行为:
我需要 observable 来收集值并在我至少有 N 项后将它们作为列表发出,或者如果自 组中的第一项 被发射。
我一遍又一遍地阅读文档,很确定它会用
buffer(timespan, unit, count[, scheduler])
但问题是这里的时间跨度取决于 last 组项目。
如果可能的话,我还需要能够刷新(强制发射)当前缓冲区,一些项目需要立即处理。我是否正确地假设,对于这种情况,我需要第二个可观察对象,并在每个项目之前执行处理并合并两者?
有什么想法吗?
Ps:我在Java工作,但我不需要Java代码,一个解释就够了。
谢谢!
这个问题的缓冲方面可以通过多播技巧来实现,但我发现为它编写一个运算符要容易得多,这样数据和上下文就在同一个可访问的地方:
public final class OperatorBufferFirst<T> implements Operator<List<T>, T> {
final Scheduler scheduler;
final long timeout;
final TimeUnit unit;
final int maxSize;
public OperatorBufferFirst(
long timeout, TimeUnit unit,
Scheduler scheduler, int maxSize) {
this.timeout = timeout;
this.unit = unit;
this.scheduler = scheduler;
this.maxSize = maxSize;
}
@Override
public Subscriber<? super T> call(
Subscriber<? super List<T>> t) {
BufferSubscriber<T> parent = new BufferSubscriber<>(
new SerializedSubscriber<>(t),
timeout, unit,
scheduler.createWorker(), maxSize);
t.add(parent);
return parent;
}
static final class BufferSubscriber<T>
extends Subscriber<T> {
final Subscriber<? super List<T>> actual;
final Scheduler.Worker w;
final long timeout;
final TimeUnit unit;
final int maxSize;
final SerialSubscription timer;
List<T> buffer;
long index;
public BufferSubscriber(
Subscriber<? super List<T>> actual,
long timeout,
TimeUnit unit,
Scheduler.Worker w,
int maxSize) {
this.actual = actual;
this.timeout = timeout;
this.unit = unit;
this.w = w;
this.maxSize = maxSize;
this.timer = new SerialSubscription();
this.buffer = new ArrayList<>();
this.add(timer);
this.add(w);
}
@Override
public void onNext(T t) {
List<T> b;
boolean startTimer = false;
boolean emit = false;
long idx;
synchronized (this) {
b = buffer;
b.add(t);
idx = index;
int n = b.size();
if (n == 1) {
startTimer = true;
} else
if (n < maxSize) {
return;
} else {
buffer = new ArrayList<>();
index = ++idx;
emit = true;
}
}
if (startTimer) {
final long fidx = idx;
timer.set(w.schedule(() -> timeout(fidx), timeout, unit));
}
if (emit) {
timer.set(Subscriptions.unsubscribed());
actual.onNext(b);
}
}
@Override
public void onError(Throwable e) {
actual.onError(e);
}
@Override
public void onCompleted() {
timer.unsubscribe();
List<T> b;
synchronized (this) {
b = buffer;
buffer = null;
index++;
}
if (!b.isEmpty()) {
actual.onNext(b);
}
actual.onCompleted();
}
public void timeout(long idx) {
List<T> b;
synchronized (this) {
b = buffer;
if (idx != index) {
return;
}
buffer = new ArrayList<>();
index = idx + 1;
}
actual.onNext(b);
}
}
public static void main(String[] args) {
TestScheduler s = Schedulers.test();
PublishSubject<Integer> source = PublishSubject.create();
source.lift(new OperatorBufferFirst<>(1, TimeUnit.SECONDS, s, 3))
.subscribe(System.out::println, Throwable::printStackTrace,
() -> System.out.println("Done"));
source.onNext(1);
source.onNext(2);
source.onNext(3);
source.onNext(4);
s.advanceTimeBy(1, TimeUnit.SECONDS);
source.onNext(5);
source.onNext(6);
s.advanceTimeBy(1, TimeUnit.SECONDS);
s.advanceTimeBy(1, TimeUnit.SECONDS);
source.onNext(7);
source.onCompleted();
}
}
它将值累积到列表中,并为第一个元素启动定时任务,或者如果缓冲区已满则发出缓冲区。
至于flush,一般不能这么简单,必须和运营商建立协议,如果传入的T值是某种特殊的,就说flush。例如,你在某处有一个 T 类型的 FLUSH 常量,每当操作符遇到它时,它应该发出当前缓冲区:
synchronized (this) {
b = buffer;
idx = index;
if (t != FLUSH) {
b.add(t);
int n = b.size();
if (n == 1) {
startTimer = true;
} else
if (n < maxSize) {
return;
} else {
buffer = new ArrayList<>();
index = ++idx;
emit = true;
}
} else {
buffer = new ArrayList<>();
index = ++idx;
emit = true;
}
}
对 Rx 世界还很陌生,我需要实现以下行为:
我需要 observable 来收集值并在我至少有 N 项后将它们作为列表发出,或者如果自 组中的第一项 被发射。
我一遍又一遍地阅读文档,很确定它会用
buffer(timespan, unit, count[, scheduler])
但问题是这里的时间跨度取决于 last 组项目。
如果可能的话,我还需要能够刷新(强制发射)当前缓冲区,一些项目需要立即处理。我是否正确地假设,对于这种情况,我需要第二个可观察对象,并在每个项目之前执行处理并合并两者?
有什么想法吗?
Ps:我在Java工作,但我不需要Java代码,一个解释就够了。
谢谢!
这个问题的缓冲方面可以通过多播技巧来实现,但我发现为它编写一个运算符要容易得多,这样数据和上下文就在同一个可访问的地方:
public final class OperatorBufferFirst<T> implements Operator<List<T>, T> {
final Scheduler scheduler;
final long timeout;
final TimeUnit unit;
final int maxSize;
public OperatorBufferFirst(
long timeout, TimeUnit unit,
Scheduler scheduler, int maxSize) {
this.timeout = timeout;
this.unit = unit;
this.scheduler = scheduler;
this.maxSize = maxSize;
}
@Override
public Subscriber<? super T> call(
Subscriber<? super List<T>> t) {
BufferSubscriber<T> parent = new BufferSubscriber<>(
new SerializedSubscriber<>(t),
timeout, unit,
scheduler.createWorker(), maxSize);
t.add(parent);
return parent;
}
static final class BufferSubscriber<T>
extends Subscriber<T> {
final Subscriber<? super List<T>> actual;
final Scheduler.Worker w;
final long timeout;
final TimeUnit unit;
final int maxSize;
final SerialSubscription timer;
List<T> buffer;
long index;
public BufferSubscriber(
Subscriber<? super List<T>> actual,
long timeout,
TimeUnit unit,
Scheduler.Worker w,
int maxSize) {
this.actual = actual;
this.timeout = timeout;
this.unit = unit;
this.w = w;
this.maxSize = maxSize;
this.timer = new SerialSubscription();
this.buffer = new ArrayList<>();
this.add(timer);
this.add(w);
}
@Override
public void onNext(T t) {
List<T> b;
boolean startTimer = false;
boolean emit = false;
long idx;
synchronized (this) {
b = buffer;
b.add(t);
idx = index;
int n = b.size();
if (n == 1) {
startTimer = true;
} else
if (n < maxSize) {
return;
} else {
buffer = new ArrayList<>();
index = ++idx;
emit = true;
}
}
if (startTimer) {
final long fidx = idx;
timer.set(w.schedule(() -> timeout(fidx), timeout, unit));
}
if (emit) {
timer.set(Subscriptions.unsubscribed());
actual.onNext(b);
}
}
@Override
public void onError(Throwable e) {
actual.onError(e);
}
@Override
public void onCompleted() {
timer.unsubscribe();
List<T> b;
synchronized (this) {
b = buffer;
buffer = null;
index++;
}
if (!b.isEmpty()) {
actual.onNext(b);
}
actual.onCompleted();
}
public void timeout(long idx) {
List<T> b;
synchronized (this) {
b = buffer;
if (idx != index) {
return;
}
buffer = new ArrayList<>();
index = idx + 1;
}
actual.onNext(b);
}
}
public static void main(String[] args) {
TestScheduler s = Schedulers.test();
PublishSubject<Integer> source = PublishSubject.create();
source.lift(new OperatorBufferFirst<>(1, TimeUnit.SECONDS, s, 3))
.subscribe(System.out::println, Throwable::printStackTrace,
() -> System.out.println("Done"));
source.onNext(1);
source.onNext(2);
source.onNext(3);
source.onNext(4);
s.advanceTimeBy(1, TimeUnit.SECONDS);
source.onNext(5);
source.onNext(6);
s.advanceTimeBy(1, TimeUnit.SECONDS);
s.advanceTimeBy(1, TimeUnit.SECONDS);
source.onNext(7);
source.onCompleted();
}
}
它将值累积到列表中,并为第一个元素启动定时任务,或者如果缓冲区已满则发出缓冲区。
至于flush,一般不能这么简单,必须和运营商建立协议,如果传入的T值是某种特殊的,就说flush。例如,你在某处有一个 T 类型的 FLUSH 常量,每当操作符遇到它时,它应该发出当前缓冲区:
synchronized (this) {
b = buffer;
idx = index;
if (t != FLUSH) {
b.add(t);
int n = b.size();
if (n == 1) {
startTimer = true;
} else
if (n < maxSize) {
return;
} else {
buffer = new ArrayList<>();
index = ++idx;
emit = true;
}
} else {
buffer = new ArrayList<>();
index = ++idx;
emit = true;
}
}