Spark Streaming 设计问题
Spark Streaming Desinging Questiion
我是 spark 的新手。我想做火花流设置来检索以下格式文件的键值对:
文件:info1
注意:每个信息文件将包含大约 1000 条这样的记录。我们的系统不断生成这些信息文件。通过 spark streaming,我想映射行号和信息文件,并希望获得聚合结果。
我们能给spark cluster这样的文件输入吗?我只对 "SF" 和 "DA" 分隔符感兴趣,"SF" 对应于源文件,"DA" 对应于(行号,计数)。
因为这个输入数据不是行格式,所以将这些文件用于 spark 输入是个好主意,还是我需要做一些中间阶段,我需要清理这些文件以生成新文件将每条记录信息排成一行而不是块?
或者我们可以在 Spark 本身中实现这一点吗?正确的方法应该是什么?
我想达到什么目的?
我想获得行级信息。意思是,获取行(作为键)和信息文件(作为值)
我想要的最终输出如下:
line178 -> (info1, info2, info7.................)
第 2908 行 -> (info3, info90, ..., ... ,)
如果我的解释不清楚或遗漏了什么,请告诉我。
感谢和问候,
文帝
你可以这样做。拥有您的 DStream 流:
// this gives you DA & FP lines, with the line number as the key
val validLines = stream.map(_.split(":")).
filter(line => Seq("DA", "FP").contains(line._1)).
map(_._2.split(","))
map(line => (line._1, line._2))
// now you should accumulate values
val state = validLines.updateStateByKey[Seq[String]](updateFunction _)
def updateFunction(newValues: Seq[Seq[String]], runningValues: Option[Seq[String]]): Option[Seq[String]] = {
// add the new values
val newVals = runnigValues match {
case Some(list) => list :: newValues
case _ => newValues
}
Some(newVals)
}
这应该为每个键累积一个具有关联值的序列,并将其存储在 state
我是 spark 的新手。我想做火花流设置来检索以下格式文件的键值对:
文件:info1
注意:每个信息文件将包含大约 1000 条这样的记录。我们的系统不断生成这些信息文件。通过 spark streaming,我想映射行号和信息文件,并希望获得聚合结果。
我们能给spark cluster这样的文件输入吗?我只对 "SF" 和 "DA" 分隔符感兴趣,"SF" 对应于源文件,"DA" 对应于(行号,计数)。
因为这个输入数据不是行格式,所以将这些文件用于 spark 输入是个好主意,还是我需要做一些中间阶段,我需要清理这些文件以生成新文件将每条记录信息排成一行而不是块?
或者我们可以在 Spark 本身中实现这一点吗?正确的方法应该是什么?
我想达到什么目的? 我想获得行级信息。意思是,获取行(作为键)和信息文件(作为值)
我想要的最终输出如下: line178 -> (info1, info2, info7.................)
第 2908 行 -> (info3, info90, ..., ... ,)
如果我的解释不清楚或遗漏了什么,请告诉我。
感谢和问候, 文帝
你可以这样做。拥有您的 DStream 流:
// this gives you DA & FP lines, with the line number as the key
val validLines = stream.map(_.split(":")).
filter(line => Seq("DA", "FP").contains(line._1)).
map(_._2.split(","))
map(line => (line._1, line._2))
// now you should accumulate values
val state = validLines.updateStateByKey[Seq[String]](updateFunction _)
def updateFunction(newValues: Seq[Seq[String]], runningValues: Option[Seq[String]]): Option[Seq[String]] = {
// add the new values
val newVals = runnigValues match {
case Some(list) => list :: newValues
case _ => newValues
}
Some(newVals)
}
这应该为每个键累积一个具有关联值的序列,并将其存储在 state