pyspark rdd isCheckPointed() 为假
pyspark rdd isCheckPointed() is false
我在将 500 多列迭代添加到我的 pyspark 数据帧时遇到了 Whosebugerrors。所以,我包括了检查点。检查站没有帮助。因此,我创建了以下玩具应用程序来测试我的检查点是否正常工作。我在这个例子中所做的就是通过一遍又一遍地复制原始列来迭代创建列。我坚持,每 10 次迭代检查点和计数。我注意到我的 dataframe.rdd.isCheckpointed() 总是 returns False。我可以验证确实在磁盘上创建并填充了检查点文件夹。我 运行 在 glcoud 上使用 dataproc。
这是我的代码:
from pyspark import SparkContext, SparkConf
from pyspark import StorageLevel
from pyspark.sql import SparkSession
import pandas as pd
import numpy as np
import sys
APP_NAME = "isCheckPointWorking"
spark = SparkSession\
.builder\
.appName(APP_NAME)\
.config("spark.sql.crossJoin.enabled","true")\
.getOrCreate()
sc = SparkContext.getOrCreate()
#set the checkpoint directory
sc.setCheckpointDir('gs://mybucket/checkpointtest/')
#create a spark dataframe with one column containing numbers 1 through 9
df4 = spark.createDataFrame(pd.DataFrame(np.arange(1,10),columns = ["A"]))
df4.show()
#create a list of new columns to be added to the dataframe
numberList = np.arange(0,40)
colNewList = ['col'+str(x) for x in numberList]
print(colNewList)
iterCount = 0
for colName in colNewList:
#copy column A in to the new column
df4 = df4.withColumn(colName,df4.A)
if (np.mod(iterCount,10) == 0):
df4 = df4.persist(StorageLevel.MEMORY_AND_DISK)
df4.checkpoint(eager=True)
df4.count()
#checking if underlying RDD is being checkpointed
print("is data frame checkpointed "+str(df4.rdd.isCheckpointed()))
iterCount +=1
不清楚为什么 df4.rdd.isCheckpointed() 每次都返回 False,当我看到正在填充检查点文件夹时。有什么想法吗?
检查点方法returns一个新的检查点数据集,它不会修改当前数据集。
改变
df4.checkpoint(eager=True)
到
df4 = df4.checkpoint(eager=True)
我在将 500 多列迭代添加到我的 pyspark 数据帧时遇到了 Whosebugerrors。所以,我包括了检查点。检查站没有帮助。因此,我创建了以下玩具应用程序来测试我的检查点是否正常工作。我在这个例子中所做的就是通过一遍又一遍地复制原始列来迭代创建列。我坚持,每 10 次迭代检查点和计数。我注意到我的 dataframe.rdd.isCheckpointed() 总是 returns False。我可以验证确实在磁盘上创建并填充了检查点文件夹。我 运行 在 glcoud 上使用 dataproc。
这是我的代码:
from pyspark import SparkContext, SparkConf
from pyspark import StorageLevel
from pyspark.sql import SparkSession
import pandas as pd
import numpy as np
import sys
APP_NAME = "isCheckPointWorking"
spark = SparkSession\
.builder\
.appName(APP_NAME)\
.config("spark.sql.crossJoin.enabled","true")\
.getOrCreate()
sc = SparkContext.getOrCreate()
#set the checkpoint directory
sc.setCheckpointDir('gs://mybucket/checkpointtest/')
#create a spark dataframe with one column containing numbers 1 through 9
df4 = spark.createDataFrame(pd.DataFrame(np.arange(1,10),columns = ["A"]))
df4.show()
#create a list of new columns to be added to the dataframe
numberList = np.arange(0,40)
colNewList = ['col'+str(x) for x in numberList]
print(colNewList)
iterCount = 0
for colName in colNewList:
#copy column A in to the new column
df4 = df4.withColumn(colName,df4.A)
if (np.mod(iterCount,10) == 0):
df4 = df4.persist(StorageLevel.MEMORY_AND_DISK)
df4.checkpoint(eager=True)
df4.count()
#checking if underlying RDD is being checkpointed
print("is data frame checkpointed "+str(df4.rdd.isCheckpointed()))
iterCount +=1
不清楚为什么 df4.rdd.isCheckpointed() 每次都返回 False,当我看到正在填充检查点文件夹时。有什么想法吗?
检查点方法returns一个新的检查点数据集,它不会修改当前数据集。
改变
df4.checkpoint(eager=True)
到
df4 = df4.checkpoint(eager=True)