为什么Spark将RDD转为DataFrame会出现不可序列化异常?
Why Spark not serializable exception occurs when changing RDD to DataFrame?
我正在使用结构化流式传输并且以下代码有效
val j = new Jedis() // an redis client which is not serializable.
xx.writeStream.foreachBatch{(batchDF: DataFrame, batchId: Long) => {
j.xtrim(...)... // call function of Jedis here
batchDF.rdd.mapPartitions(...)
}}
但是下面的代码抛出异常,object not serializable (class: redis.clients.jedis.Jedis, value: redis.clients.jedis.Jedis@a8e0378)
代码只有一处改动(将RDD改为DataFrame):
val j = new Jedis() // an redis client which is not serializable.
xx.writeStream.foreachBatch{(batchDF: DataFrame, batchId: Long) => {
j.xtrim(...)... // call function of Jedis here
batchDF.mapPartitions(...) // only change is change batchDF.rdd to batchDF
}}
我的 Jedis
代码应该在驱动程序上执行,永远不会到达执行程序。我想 Spark RDD 和 DataFrame 应该有类似的 APIS?为什么会这样?
我使用 ctrl 进入了较低级别的代码。 batchDF.mapPartitions
转到
@Experimental
@InterfaceStability.Evolving
def mapPartitions[U : Encoder](func: Iterator[T] => Iterator[U]): Dataset[U] =
{
new Dataset[U](
sparkSession,
MapPartitions[T, U](func, logicalPlan),
implicitly[Encoder[U]])
}
和 batchDF.rdd.mapPartitions
转到
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
preservesPartitioning)
}
我的 Spark 版本是 2.4.3。
下面是我最简单的代码版本,我刚刚发现了其他东西...
val j = new Jedis() // an redis client which is not serializable.
xx.writeStream.foreachBatch{(batchDF: DataFrame, batchId: Long) => {
j.xtrim(...)... // call function of Jedis here
batchDF.mapPartitions(x => {
val arr = x.grouped(2).toArray // this line matters
})
// only change is change batchDF.rdd to batchDF
}}
见this DataFrame api implementation
在内部调用 rdd.mapPartitions 您的函数。
/**
* Returns a new RDD by applying a function to each partition of this DataFrame.
* @group rdd
* @since 1.3.0
*/
def mapPartitions[R: ClassTag](f: Iterator[Row] => Iterator[R]): RDD[R] = {
rdd.mapPartitions(f)
}
你可能在其他地方犯了错误,但没有区别。
AFAIK,理想情况下应该是这样
batchDF.mapPartitions { yourparition =>
// better to create a JedisPool and take object rather than new Jedis
val j = new Jedis()
val result = yourparition.map {
// do some process here
}
j.close // release and take care of connections/ resources here
result
}
}
我正在使用结构化流式传输并且以下代码有效
val j = new Jedis() // an redis client which is not serializable.
xx.writeStream.foreachBatch{(batchDF: DataFrame, batchId: Long) => {
j.xtrim(...)... // call function of Jedis here
batchDF.rdd.mapPartitions(...)
}}
但是下面的代码抛出异常,object not serializable (class: redis.clients.jedis.Jedis, value: redis.clients.jedis.Jedis@a8e0378)
代码只有一处改动(将RDD改为DataFrame):
val j = new Jedis() // an redis client which is not serializable.
xx.writeStream.foreachBatch{(batchDF: DataFrame, batchId: Long) => {
j.xtrim(...)... // call function of Jedis here
batchDF.mapPartitions(...) // only change is change batchDF.rdd to batchDF
}}
我的 Jedis
代码应该在驱动程序上执行,永远不会到达执行程序。我想 Spark RDD 和 DataFrame 应该有类似的 APIS?为什么会这样?
我使用 ctrl 进入了较低级别的代码。 batchDF.mapPartitions
转到
@Experimental
@InterfaceStability.Evolving
def mapPartitions[U : Encoder](func: Iterator[T] => Iterator[U]): Dataset[U] =
{
new Dataset[U](
sparkSession,
MapPartitions[T, U](func, logicalPlan),
implicitly[Encoder[U]])
}
和 batchDF.rdd.mapPartitions
转到
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
preservesPartitioning)
}
我的 Spark 版本是 2.4.3。
下面是我最简单的代码版本,我刚刚发现了其他东西...
val j = new Jedis() // an redis client which is not serializable.
xx.writeStream.foreachBatch{(batchDF: DataFrame, batchId: Long) => {
j.xtrim(...)... // call function of Jedis here
batchDF.mapPartitions(x => {
val arr = x.grouped(2).toArray // this line matters
})
// only change is change batchDF.rdd to batchDF
}}
见this DataFrame api implementation
在内部调用 rdd.mapPartitions 您的函数。
/**
* Returns a new RDD by applying a function to each partition of this DataFrame.
* @group rdd
* @since 1.3.0
*/
def mapPartitions[R: ClassTag](f: Iterator[Row] => Iterator[R]): RDD[R] = {
rdd.mapPartitions(f)
}
你可能在其他地方犯了错误,但没有区别。
AFAIK,理想情况下应该是这样
batchDF.mapPartitions { yourparition =>
// better to create a JedisPool and take object rather than new Jedis
val j = new Jedis()
val result = yourparition.map {
// do some process here
}
j.close // release and take care of connections/ resources here
result
}
}