Spark RDD 和 Dataframe 转换优化
Spark RDD and Dataframe transformation optimisation
我是 Spark 的新手,有以下关于 RDD 和 Dataframes 的高级问题,如果我没记错的话,它们是建立在 RDD 之上的:
我知道可以对 RDD 进行两种类型的操作,转换和操作。我还了解到,仅当对作为该转换产物的 RDD 执行操作时才执行转换。鉴于 RDD 在内存中,我想知道是否有可能优化这些 RDD 消耗的内存量,举个例子:
KafkaDF = KafkaDFRaw.select(
KafkaDFRaw.key,
KafkaDFRaw.value,
KafkaDFRaw.topic,
unix_timestamp('timestamp',
'yyyy-MM-dd HH:mm:ss').alias('kafka_arrival_time')
).withColumn("spark_arrival_time", udf(time.time, DoubleType())())
我有一个 KafkaDFRaw 数据框,我生成了一个名为 KafkaDF 的新 RDD。然后我希望向这个新的 RDD 添加列。我应该将它们添加到现有的 RDD 中吗?像这样:
decoded_value_udf = udf(lambda value: value.decode("utf-8"))
KafkaDF = KafkaDF\
.withColumn(
"cleanKey", decoded_value_udf(KafkaDF.key))\
.withColumn(
"cleanValue", decoded_value_udf(KafkaDF.value))
或者我应该从上一个数据框创建一个新的数据框吗?像这样:
decoded_value_udf = udf(lambda value: value.decode("utf-8"))
KafkaDF_NEW = KafkaDF\
.withColumn(
"cleanKey", decoded_value_udf(KafkaDF.key))\
.withColumn(
"cleanValue", decoded_value_udf(KafkaDF.value))
这对内存优化有影响吗?
提前感谢您的帮助。
每当调用操作时,都会执行优化的 dag,并按计划使用内存。
大家可以对比一下执行计划来了解:
df.explain(true)
df_new.explain(true)
在两者之间创建额外的变量来保存转换不会影响内存利用率。内存要求将取决于数据大小、分区大小、随机播放等。
我是 Spark 的新手,有以下关于 RDD 和 Dataframes 的高级问题,如果我没记错的话,它们是建立在 RDD 之上的:
我知道可以对 RDD 进行两种类型的操作,转换和操作。我还了解到,仅当对作为该转换产物的 RDD 执行操作时才执行转换。鉴于 RDD 在内存中,我想知道是否有可能优化这些 RDD 消耗的内存量,举个例子:
KafkaDF = KafkaDFRaw.select(
KafkaDFRaw.key,
KafkaDFRaw.value,
KafkaDFRaw.topic,
unix_timestamp('timestamp',
'yyyy-MM-dd HH:mm:ss').alias('kafka_arrival_time')
).withColumn("spark_arrival_time", udf(time.time, DoubleType())())
我有一个 KafkaDFRaw 数据框,我生成了一个名为 KafkaDF 的新 RDD。然后我希望向这个新的 RDD 添加列。我应该将它们添加到现有的 RDD 中吗?像这样:
decoded_value_udf = udf(lambda value: value.decode("utf-8"))
KafkaDF = KafkaDF\
.withColumn(
"cleanKey", decoded_value_udf(KafkaDF.key))\
.withColumn(
"cleanValue", decoded_value_udf(KafkaDF.value))
或者我应该从上一个数据框创建一个新的数据框吗?像这样:
decoded_value_udf = udf(lambda value: value.decode("utf-8"))
KafkaDF_NEW = KafkaDF\
.withColumn(
"cleanKey", decoded_value_udf(KafkaDF.key))\
.withColumn(
"cleanValue", decoded_value_udf(KafkaDF.value))
这对内存优化有影响吗?
提前感谢您的帮助。
每当调用操作时,都会执行优化的 dag,并按计划使用内存。 大家可以对比一下执行计划来了解:
df.explain(true)
df_new.explain(true)
在两者之间创建额外的变量来保存转换不会影响内存利用率。内存要求将取决于数据大小、分区大小、随机播放等。