如何在 RxJs 中组合两个流等等?

How to combine two streams in RxJs and a bit more?

我有反应形式。当我打字时停了 3 秒 - 它应该 console.log(form.value)。但是,当我单击 submit 时 — console.log(form.value) 应该立即记录下来,并且还可以防止之前的 console.log() 等待 3 秒。在我输入之后 - 而不是按 submit - 它应该在 3 秒后再次 console.log()

我是 RxJs 的新手。我刚刚弄清楚如何使用 debounceTime() 运算符制作第一部分。但是我无法理解如何 "inject" 将事件提交到表单流中并阻止它 console.log();

Link on stackblitz

component.html

<form [formGroup]="form" (ngSubmit)="onSubmit()">
  <input type="text" formControlName="name" placeholder="name"><br>
  <input type="text" formControlName="age" placeholder="age"><br>

  <button>Save</button>
</form>

component.ts

export class AppComponent implements OnInit  {
  form: FormGroup;

  constructor(private fb: FormBuilder) {}

  ngOnInit() {
    this.form = this.fb.group({
      'name': [],
      'age': []
    });

    this.form.valueChanges.pipe(
      debounceTime(3000)
    ).subscribe(value => console.log(value));
  }

  onSubmit() {
    console.log(this.form.value);
  }
}

预期行为:

当我打字时,停 2 秒 - 没有任何反应,如果我停 3 秒或更长时间,则 console.log(form.value);

当按下 submit 时应该 console.log(form.value) 立即取消第一个案例的先前等待。

提交表单时,您可以简单地从 Observable unsubscribe

Check it out

export class AppComponent implements OnInit  {
  form: FormGroup;

  subscription;

  constructor(private fb: FormBuilder) {}

  ngOnInit() {
    this.form = this.fb.group({
      'name': [],
      'age': []
    });

    this.subscription = this.form.valueChanges.pipe(
      debounceTime(3000)
    ).subscribe(value => console.log(value));
  }

  onSubmit() {
    this.subscription.unsubscribe();
    console.log(this.form.value);
  }
}

解决方法: https://stackblitz.com/edit/angular-gn1dwx

所以,在流中思考,我们有:

  • 提交流
  • 去抖动变化事件流

我们想避免重复的事件。因此,我们合并了 2 个流,并添加了 distinctUntilChanged 运算符以避免重复值。我们 serialize/deserialize 因为我们不希望 distinct 对实例进行操作,而是对值进行操作。

为了在这个组件被销毁时清理我们的订阅,我们使用 takeUntil 来自动关闭 observable,它会自动处理我们的订阅(总是清理订阅!)

注意:修改一个值,等待4秒再点击提交不会触发订阅。我认为这是预期的行为。