Apache Flink:如何使用 Flink DataSet 从一个数据集创建两个数据集 API
Apache Flink: How to create two datasets from one dataset using Flink DataSet API
我正在使用 Flink 0.10.1 的 DataSet API 编写应用程序。
我可以在 Flink 中使用单个运算符获取多个收集器吗?
我想做的是如下所示:
val lines = env.readTextFile(...)
val (out_small, out_large) = lines **someOp** {
(iterator, collector1, collector2) => {
for (line <- iterator) {
val (elem1, elem2) = doParsing(line)
collector1.collect(elem1)
collector2.collect(elem2)
}
}
}
目前我正在两次调用 mapPartition 以从一个源数据集生成两个数据集。
val lines = env.readTextFile(...)
val out_small = lines mapPartition {
(iterator, collector) => {
for (line <- iterator) {
val (elem1, elem2) = doParsing(line)
collector.collect(elem1)
}
}
}
val out_large = lines mapPartition {
(iterator, collector) => {
for (line <- iterator) {
val (elem1, elem2) = doParsing(line)
collector.collect(elem2)
}
}
}
由于 doParsing 函数非常昂贵,我想每行只调用一次。
p.s。如果您能让我知道以更简单的方式完成此类工作的其他方法,我将不胜感激。
Flink 不支持多收集器。但是,您可以通过添加指示输出类型的附加字段来更改解析步骤的输出:
val lines = env.readTextFile(...)
val intermediate = lines **someOp** {
(iterator, collector) => {
for (line <- iterator) {
val (elem1, elem2) = doParsing(line)
collector.collect(0, elem1) // 0 indicates small
collector.collect(1, elem2) // 1 indicates large
}
}
}
接下来,您使用输出 intermediate
两次,并针对第一个属性过滤每个输出。第一个过滤器过滤 0
,第二个过滤器过滤 1
(您还添加一个投影来摆脱第一个属性)。
+---> filter("0") --->
|
intermediate --+
|
+---> filter("1") --->
我正在使用 Flink 0.10.1 的 DataSet API 编写应用程序。 我可以在 Flink 中使用单个运算符获取多个收集器吗?
我想做的是如下所示:
val lines = env.readTextFile(...)
val (out_small, out_large) = lines **someOp** {
(iterator, collector1, collector2) => {
for (line <- iterator) {
val (elem1, elem2) = doParsing(line)
collector1.collect(elem1)
collector2.collect(elem2)
}
}
}
目前我正在两次调用 mapPartition 以从一个源数据集生成两个数据集。
val lines = env.readTextFile(...)
val out_small = lines mapPartition {
(iterator, collector) => {
for (line <- iterator) {
val (elem1, elem2) = doParsing(line)
collector.collect(elem1)
}
}
}
val out_large = lines mapPartition {
(iterator, collector) => {
for (line <- iterator) {
val (elem1, elem2) = doParsing(line)
collector.collect(elem2)
}
}
}
由于 doParsing 函数非常昂贵,我想每行只调用一次。
p.s。如果您能让我知道以更简单的方式完成此类工作的其他方法,我将不胜感激。
Flink 不支持多收集器。但是,您可以通过添加指示输出类型的附加字段来更改解析步骤的输出:
val lines = env.readTextFile(...)
val intermediate = lines **someOp** {
(iterator, collector) => {
for (line <- iterator) {
val (elem1, elem2) = doParsing(line)
collector.collect(0, elem1) // 0 indicates small
collector.collect(1, elem2) // 1 indicates large
}
}
}
接下来,您使用输出 intermediate
两次,并针对第一个属性过滤每个输出。第一个过滤器过滤 0
,第二个过滤器过滤 1
(您还添加一个投影来摆脱第一个属性)。
+---> filter("0") --->
|
intermediate --+
|
+---> filter("1") --->