如何在 google 云数据流上使用另一个管道值搜索管道
How can I search pipeline with another pipeline value on google cloud dataflow
我想使用 google 云数据流从流数据中搜索包含指定单词的文本。
具体我会处理下面两个stream。
- 流 A:流的元素是 "word"
- 流 B:流的元素是 "text"。每个文本由 "word" 组成。此文本可能在流 A
上有 "word"
很多"text"经常流入流B。另一方面,"word"偶尔流入A流。
当"word"流入流A时,我想搜索"text"有"word"并在5分钟前流入流B。
例子
time stream A : stream B
00:01 - this is an apple
00:02 - this is an orange
00:03 - I have an apple
00:04 apple <= "this is an apple" and "I have an apple" are found
00:05 this <= "this is an apple" and "this is an orange" are found
我可以使用 google 云数据流搜索文本吗?
如果我正确理解你的问题,有多种方法可以实现你想要的东西。我将描述两种变体。
我的示例代码中的基本思想是使用内部联接和 SlidingWindows
五分钟。您可以使用 ParDo
侧输入或 CoGroupByKey
实现连接,具体取决于您的数据大小。
这是设置输入和 windowing 的方法:
PCollection<String> streamA = ...;
PCollection<String> streamB = ...;
PCollection<String> windowedStreamA = streamA.apply(
Window.into(
SlidingWindows.of(Duration.standardMinutes(5)).every(...)));
PCollection<String> windowedStreamB = streamB.apply(
Window.into(
SlidingWindows.of(Duration.standardMinutes(5)).every(...)));
您可能需要调整 windows 或句点的大小以满足您的规格和性能需求。
下面是如何使用侧输入进行连接的草图。对于 streamA
的每个元素,这将遍历 streamB
的整个五分钟 window,因此如果 windows 变大,性能将受到影响。
PCollectionView<Iterable<String>> streamBview = streamB.apply(View.asIterable());
PCollection<String> matches = windowedStreamA.apply(
ParDo.of(new DoFn<String, String>() {
@Override void processElement(ProcessContext context) {
for (String text : context.sideInput()) {
if (split(text).contains(context.element())) {
context.output(text);
}
}
}
});
下面是如何使用 CoGroupByKey
执行此操作的草图,方法是预先拆分文本并将每个关键字与包含该关键字的行连接起来。 SDK 附带的 TfIdf
示例中有类似的逻辑。
PCollection<KV<String, Void>> keyedStreamA = windowedStreamA.apply(
MapElements
.via(word -> KV.of(word, null))
.withOutputType(new TypeDescriptor<KV<String, Void>>() {}));
PCollection<KV<String, String>> keyedStreamB = windowedStreamB.apply(
FlatMapElements
.via(text -> split(text).forEach(word --> KV.of(word, text))
.withOutputType(new TypeDescriptor<KV<String, String>>() {}));
TupleTag<Void> tagA = new TupleTag<Void>() {};
TupleTag<String> tagB = new TupleTag<String>() {};
KeyedPCollectionTuple coGbkInput = KeyedPCollectionTuple
.of(tagA, keyedStreamA)
.and(tagB, keyedStreamB);
PCollection<String> matches = coGbkInput
.apply(CoGroupByKey.create())
.apply(FlatMapElements
.via(result -> result.getAll(tagB))
.withOutputType(new TypeDescriptor<String>()));
最佳方法取决于您的数据。如果您可以接受比最后五分钟更多的匹配,您可以通过扩大滑动 windows 和更大的周期来调整 windows 中的数据重复量。您还可以使用触发器来调整何时产生输出。
我想使用 google 云数据流从流数据中搜索包含指定单词的文本。
具体我会处理下面两个stream。
- 流 A:流的元素是 "word"
- 流 B:流的元素是 "text"。每个文本由 "word" 组成。此文本可能在流 A 上有 "word"
很多"text"经常流入流B。另一方面,"word"偶尔流入A流。
当"word"流入流A时,我想搜索"text"有"word"并在5分钟前流入流B。
例子
time stream A : stream B
00:01 - this is an apple
00:02 - this is an orange
00:03 - I have an apple
00:04 apple <= "this is an apple" and "I have an apple" are found
00:05 this <= "this is an apple" and "this is an orange" are found
我可以使用 google 云数据流搜索文本吗?
如果我正确理解你的问题,有多种方法可以实现你想要的东西。我将描述两种变体。
我的示例代码中的基本思想是使用内部联接和 SlidingWindows
五分钟。您可以使用 ParDo
侧输入或 CoGroupByKey
实现连接,具体取决于您的数据大小。
这是设置输入和 windowing 的方法:
PCollection<String> streamA = ...;
PCollection<String> streamB = ...;
PCollection<String> windowedStreamA = streamA.apply(
Window.into(
SlidingWindows.of(Duration.standardMinutes(5)).every(...)));
PCollection<String> windowedStreamB = streamB.apply(
Window.into(
SlidingWindows.of(Duration.standardMinutes(5)).every(...)));
您可能需要调整 windows 或句点的大小以满足您的规格和性能需求。
下面是如何使用侧输入进行连接的草图。对于 streamA
的每个元素,这将遍历 streamB
的整个五分钟 window,因此如果 windows 变大,性能将受到影响。
PCollectionView<Iterable<String>> streamBview = streamB.apply(View.asIterable());
PCollection<String> matches = windowedStreamA.apply(
ParDo.of(new DoFn<String, String>() {
@Override void processElement(ProcessContext context) {
for (String text : context.sideInput()) {
if (split(text).contains(context.element())) {
context.output(text);
}
}
}
});
下面是如何使用 CoGroupByKey
执行此操作的草图,方法是预先拆分文本并将每个关键字与包含该关键字的行连接起来。 SDK 附带的 TfIdf
示例中有类似的逻辑。
PCollection<KV<String, Void>> keyedStreamA = windowedStreamA.apply(
MapElements
.via(word -> KV.of(word, null))
.withOutputType(new TypeDescriptor<KV<String, Void>>() {}));
PCollection<KV<String, String>> keyedStreamB = windowedStreamB.apply(
FlatMapElements
.via(text -> split(text).forEach(word --> KV.of(word, text))
.withOutputType(new TypeDescriptor<KV<String, String>>() {}));
TupleTag<Void> tagA = new TupleTag<Void>() {};
TupleTag<String> tagB = new TupleTag<String>() {};
KeyedPCollectionTuple coGbkInput = KeyedPCollectionTuple
.of(tagA, keyedStreamA)
.and(tagB, keyedStreamB);
PCollection<String> matches = coGbkInput
.apply(CoGroupByKey.create())
.apply(FlatMapElements
.via(result -> result.getAll(tagB))
.withOutputType(new TypeDescriptor<String>()));
最佳方法取决于您的数据。如果您可以接受比最后五分钟更多的匹配,您可以通过扩大滑动 windows 和更大的周期来调整 windows 中的数据重复量。您还可以使用触发器来调整何时产生输出。