如何在 google 云数据流上使用另一个管道值搜索管道

How can I search pipeline with another pipeline value on google cloud dataflow

我想使用 google 云数据流从流数据中搜索包含指定单词的文本。

具体我会处理下面两个stream。

很多"text"经常流入流B。另一方面,"word"偶尔流入A流。

当"word"流入流A时,我想搜索"text"有"word"并在5分钟前流入流B。

例子

time  stream A : stream B
00:01 -          this is an apple
00:02 -          this is an orange
00:03 -          I have an apple
00:04 apple                        <= "this is an apple" and "I have an apple" are found
00:05 this                         <= "this is an apple" and "this is an orange" are found

我可以使用 google 云数据流搜索文本吗?

如果我正确理解你的问题,有多种方法可以实现你想要的东西。我将描述两种变体。

我的示例代码中的基本思想是使用内部联接和 SlidingWindows 五分钟。您可以使用 ParDo 侧输入或 CoGroupByKey 实现连接,具体取决于您的数据大小。

这是设置输入和 windowing 的方法:

PCollection<String> streamA = ...; 
PCollection<String> streamB = ...;

PCollection<String> windowedStreamA = streamA.apply(
    Window.into(
        SlidingWindows.of(Duration.standardMinutes(5)).every(...)));

PCollection<String> windowedStreamB = streamB.apply(
    Window.into(
        SlidingWindows.of(Duration.standardMinutes(5)).every(...)));

您可能需要调整 windows 或句点的大小以满足您的规格和性能需求。

下面是如何使用侧输入进行连接的草图。对于 streamA 的每个元素,这将遍历 streamB 的整个五分钟 window,因此如果 windows 变大,性能将受到影响。

PCollectionView<Iterable<String>> streamBview = streamB.apply(View.asIterable());

PCollection<String> matches = windowedStreamA.apply(
    ParDo.of(new DoFn<String, String>() {
      @Override void processElement(ProcessContext context) {
        for (String text : context.sideInput()) {
          if (split(text).contains(context.element())) {
            context.output(text);
          }
        }
      }
    });

下面是如何使用 CoGroupByKey 执行此操作的草图,方法是预先拆分文本并将每个关键字与包含该关键字的行连接起来。 SDK 附带的 TfIdf 示例中有类似的逻辑。

PCollection<KV<String, Void>> keyedStreamA = windowedStreamA.apply(
    MapElements
        .via(word -> KV.of(word, null))
        .withOutputType(new TypeDescriptor<KV<String, Void>>() {}));

PCollection<KV<String, String>> keyedStreamB = windowedStreamB.apply(
    FlatMapElements
        .via(text -> split(text).forEach(word --> KV.of(word, text))
        .withOutputType(new TypeDescriptor<KV<String, String>>() {}));


TupleTag<Void> tagA = new TupleTag<Void>() {};
TupleTag<String> tagB = new TupleTag<String>() {};

KeyedPCollectionTuple coGbkInput = KeyedPCollectionTuple
    .of(tagA, keyedStreamA)
    .and(tagB, keyedStreamB);

PCollection<String> matches = coGbkInput
    .apply(CoGroupByKey.create())
    .apply(FlatMapElements
        .via(result -> result.getAll(tagB))
        .withOutputType(new TypeDescriptor<String>()));

最佳方法取决于您的数据。如果您可以接受比最后五分钟更多的匹配,您可以通过扩大滑动 windows 和更大的周期来调整 windows 中的数据重复量。您还可以使用触发器来调整何时产生输出。