Spark 上下文不可序列化?
Spark Context not Serializable?
因此,我在 Spark 中遇到了臭名昭著的 Task Not Serializable
错误。这是相关的代码块:
val labeledPoints: RDD[LabeledPoint] = events.map(event => {
var eventsPerEntity = try {
HBaseHelper.scan(...filter entity here...)(sc).map(newEvent => {
Try(new Object(...))
}).filter(_.isSuccess).map(_.get)
} catch {
case e: Exception => {
logger.error(s"Failed to convert event ${event}." +
s"Exception: ${e}.")
throw e
}
}
})
基本上我想要实现的是我正在访问 sc
,这是我在 map
中的 Spark 上下文对象。在运行时,我收到 Task Not Serializable
错误。
这是我能想到的潜在解决方案:
不使用 sc
查询 HBase,我可以这样做,但反过来我会有一个列表。 (如果我尝试并行化;我必须再次使用 sc
)。有一个列表将导致我无法使用 reduceByKey
,建议 in my other question. So I could not succesfully achieve this one as well, as I don't know how I would achieve 没有 reduceByKey
。另外我真的很想使用 RDDs :)
所以我正在寻找另一种解决方案+询问我是否做错了什么。提前致谢!
更新
基本上,我的问题变成了这样:
我有一个名为 events
的 RDD
。这就是整个 HBase table。注意:每个 event
都由 performerId
执行,它又是 event
中的一个字段,即 event.performerId
.
对于 events
中的每个 event
,我需要计算 event.numericColumn
与 event
的 numericColumn
的平均值的比率(子集events
) 由相同的 performerId
.
执行
我在映射 events
时尝试这样做。在 map
内,我试图根据 performerId
.
过滤事件
基本上,我正在尝试将每个 event
转换为 LabeledPoint
,上面的比率将成为我 Vector 中的一个特征。即对于每个事件,我都试图获得
// I am trying to calculate the average, but cannot use filter, because I am in map block.
LabeledPoint(
event.someColumn,
Vectors.dense(
averageAbove,
...
)
)
如有任何帮助,我将不胜感激。谢谢!
您可以将列表添加为事件的新字段 - 通过获取新的 RDD(事件+实体列表)。然后,您可以使用常规 Spark 命令 "explode" 列表,从而获得多个事件+列表项记录(尽管使用 DataFrames/DataSets 比使用 RDD 更容易做到这一点)
一个选项(如果适用)正在加载 整个 HBase table(或者 - 可能与 events
中的事件之一匹配的所有元素RDD,如果你有任何方法可以在不通过 RDD 的情况下将它们隔离)到数据帧中,然后使用 join.
要将数据从 HBase table 加载到 Dataframe,您可以使用 Hortonworks 的预览 Spark-HBase Connector。然后,在两个数据帧之间执行正确的连接操作应该很容易。
很简单,你不能在 RDD Closure 上使用 spark 上下文,所以找到另一种方法来处理这个问题。
因此,我在 Spark 中遇到了臭名昭著的 Task Not Serializable
错误。这是相关的代码块:
val labeledPoints: RDD[LabeledPoint] = events.map(event => {
var eventsPerEntity = try {
HBaseHelper.scan(...filter entity here...)(sc).map(newEvent => {
Try(new Object(...))
}).filter(_.isSuccess).map(_.get)
} catch {
case e: Exception => {
logger.error(s"Failed to convert event ${event}." +
s"Exception: ${e}.")
throw e
}
}
})
基本上我想要实现的是我正在访问 sc
,这是我在 map
中的 Spark 上下文对象。在运行时,我收到 Task Not Serializable
错误。
这是我能想到的潜在解决方案:
不使用 sc
查询 HBase,我可以这样做,但反过来我会有一个列表。 (如果我尝试并行化;我必须再次使用 sc
)。有一个列表将导致我无法使用 reduceByKey
,建议 reduceByKey
。另外我真的很想使用 RDDs :)
所以我正在寻找另一种解决方案+询问我是否做错了什么。提前致谢!
更新
基本上,我的问题变成了这样:
我有一个名为 events
的 RDD
。这就是整个 HBase table。注意:每个 event
都由 performerId
执行,它又是 event
中的一个字段,即 event.performerId
.
对于 events
中的每个 event
,我需要计算 event.numericColumn
与 event
的 numericColumn
的平均值的比率(子集events
) 由相同的 performerId
.
我在映射 events
时尝试这样做。在 map
内,我试图根据 performerId
.
基本上,我正在尝试将每个 event
转换为 LabeledPoint
,上面的比率将成为我 Vector 中的一个特征。即对于每个事件,我都试图获得
// I am trying to calculate the average, but cannot use filter, because I am in map block.
LabeledPoint(
event.someColumn,
Vectors.dense(
averageAbove,
...
)
)
如有任何帮助,我将不胜感激。谢谢!
您可以将列表添加为事件的新字段 - 通过获取新的 RDD(事件+实体列表)。然后,您可以使用常规 Spark 命令 "explode" 列表,从而获得多个事件+列表项记录(尽管使用 DataFrames/DataSets 比使用 RDD 更容易做到这一点)
一个选项(如果适用)正在加载 整个 HBase table(或者 - 可能与 events
中的事件之一匹配的所有元素RDD,如果你有任何方法可以在不通过 RDD 的情况下将它们隔离)到数据帧中,然后使用 join.
要将数据从 HBase table 加载到 Dataframe,您可以使用 Hortonworks 的预览 Spark-HBase Connector。然后,在两个数据帧之间执行正确的连接操作应该很容易。
很简单,你不能在 RDD Closure 上使用 spark 上下文,所以找到另一种方法来处理这个问题。