Spark 上下文不可序列化?

Spark Context not Serializable?

因此,我在 Spark 中遇到了臭名昭著的 Task Not Serializable 错误。这是相关的代码块:

val labeledPoints: RDD[LabeledPoint] = events.map(event => {

    var eventsPerEntity = try {
          HBaseHelper.scan(...filter entity here...)(sc).map(newEvent =>  {

            Try(new Object(...))
          }).filter(_.isSuccess).map(_.get)
        } catch {
          case e: Exception => {
            logger.error(s"Failed to convert event ${event}." +
              s"Exception: ${e}.")
            throw e
          }
        }
    })

基本上我想要实现的是我正在访问 sc,这是我在 map 中的 Spark 上下文对象。在运行时,我收到 Task Not Serializable 错误。

这是我能想到的潜在解决方案:

不使用 sc 查询 HBase,我可以这样做,但反过来我会有一个列表。 (如果我尝试并行化;我必须再次使用 sc)。有一个列表将导致我无法使用 reduceByKey,建议 in my other question. So I could not succesfully achieve this one as well, as I don't know how I would achieve 没有 reduceByKey。另外我真的很想使用 RDDs :)

所以我正在寻找另一种解决方案+询问我是否做错了什么。提前致谢!

更新

基本上,我的问题变成了这样:

我有一个名为 eventsRDD。这就是整个 HBase table。注意:每个 event 都由 performerId 执行,它又是 event 中的一个字段,即 event.performerId.

对于 events 中的每个 event,我需要计算 event.numericColumneventnumericColumn 的平均值的比率(子集events) 由相同的 performerId.

执行

我在映射 events 时尝试这样做。在 map 内,我试图根据 performerId.

过滤事件

基本上,我正在尝试将每个 event 转换为 LabeledPoint,上面的比率将成为我 Vector 中的一个特征。即对于每个事件,我都试图获得

// I am trying to calculate the average, but cannot use filter, because I am in map block.

LabeledPoint(
  event.someColumn,
  Vectors.dense(
    averageAbove,
    ...
  )
)

如有任何帮助,我将不胜感激。谢谢!

您可以将列表添加为事件的新字段 - 通过获取新的 RDD(事件+实体列表)。然后,您可以使用常规 Spark 命令 "explode" 列表,从而获得多个事件+列表项记录(尽管使用 DataFrames/DataSets 比使用 RDD 更容易做到这一点)

一个选项(如果适用)正在加载 整个 HBase table(或者 - 可能与 events 中的事件之一匹配的所有元素RDD,如果你有任何方法可以在不通过 RDD 的情况下将它们隔离)到数据帧中,然后使用 join.

要将数据从 HBase table 加载到 Dataframe,您可以使用 Hortonworks 的预览 Spark-HBase Connector。然后,在两个数据帧之间执行正确的连接操作应该很容易。

很简单,你不能在 RDD Closure 上使用 spark 上下文,所以找到另一种方法来处理这个问题。