如何在 Dataflow 中将 HashMap 作为侧输入传递
How to pass a HashMap as side input in Dataflow
我正在尝试将 HashMap 作为数据流管道中的侧输入传递。除了少数通过 String、Int 或 Long 的示例外,我找不到任何示例。我的代码:
tagList = pipeline.apply(TextIO.Read.named("tagListTextRead").from("gs://mybucket/tag-list.json"));
PCollection<Map<String,TagObject>> tagMap = tagList
.apply(ParDo.named("allTagsToTagMap").of(new Tags.BuildTagListMapFn()));
PCollectionView<Map<String, TagObject>> tagMapView =
allTags.apply(View.<String, TagObject>asMap());
第三个语句给出语法错误。
The method apply(PTransform<? super PCollection<Map<String,TagObject>>,OutputT>) in the type
PCollection<Map<String,TagObject>> is not applicable for the arguments
(View.AsMap<String,TagObject>)
谁能告诉我如何将 HashMap 作为侧输入传递到数据流管道中。
根据您的管道的详细信息,这里有两个不同的答案。
如果你有一个 PCollection<KV<K, V>>
那么你可以使用 View.asMap()
来产生一个 PCollectionView<Map<K, V>>
。无需自己构建 Map
。
如果你有一个只有一个元素的 PCollection<Map<K, V>>
那么你可以使用 View.asSingleton()
来辅助输入。
第一个可能是最自然的,您的代码最终看起来像
PCollectionView<Map<String, TagObject>> = pipeline
.apply("tagListTextRead", TextIO.Read.from("gs://mybucket/tag-list.json"))
.apply("tagsToKv", new Tags.TagToKvFunction())
.apply("viewTags", View.<String, TagObject>asMap())
扩展它以显示中间值的类型:
PCollection<String> rawTags =
pipeline.apply("tagListTextRead", TextIO.Read.from("gs://mybucket/tag-list.json"))
PCollection<KV<String, TagObject>> kvs =
rawTags.apply("tagsToKv", new Tags.TagToKvFunction())
PCollectionView<Map<String, TagObject>> =
kvs.apply("viewTags", View.<String, TagObject>asMap())
我正在尝试将 HashMap 作为数据流管道中的侧输入传递。除了少数通过 String、Int 或 Long 的示例外,我找不到任何示例。我的代码:
tagList = pipeline.apply(TextIO.Read.named("tagListTextRead").from("gs://mybucket/tag-list.json"));
PCollection<Map<String,TagObject>> tagMap = tagList
.apply(ParDo.named("allTagsToTagMap").of(new Tags.BuildTagListMapFn()));
PCollectionView<Map<String, TagObject>> tagMapView =
allTags.apply(View.<String, TagObject>asMap());
第三个语句给出语法错误。
The method apply(PTransform<? super PCollection<Map<String,TagObject>>,OutputT>) in the type
PCollection<Map<String,TagObject>> is not applicable for the arguments
(View.AsMap<String,TagObject>)
谁能告诉我如何将 HashMap 作为侧输入传递到数据流管道中。
根据您的管道的详细信息,这里有两个不同的答案。
如果你有一个
PCollection<KV<K, V>>
那么你可以使用View.asMap()
来产生一个PCollectionView<Map<K, V>>
。无需自己构建Map
。如果你有一个只有一个元素的
PCollection<Map<K, V>>
那么你可以使用View.asSingleton()
来辅助输入。
第一个可能是最自然的,您的代码最终看起来像
PCollectionView<Map<String, TagObject>> = pipeline
.apply("tagListTextRead", TextIO.Read.from("gs://mybucket/tag-list.json"))
.apply("tagsToKv", new Tags.TagToKvFunction())
.apply("viewTags", View.<String, TagObject>asMap())
扩展它以显示中间值的类型:
PCollection<String> rawTags =
pipeline.apply("tagListTextRead", TextIO.Read.from("gs://mybucket/tag-list.json"))
PCollection<KV<String, TagObject>> kvs =
rawTags.apply("tagsToKv", new Tags.TagToKvFunction())
PCollectionView<Map<String, TagObject>> =
kvs.apply("viewTags", View.<String, TagObject>asMap())