如何使用多个值更改 spark 中 DataFrame 的 na 值
How to change na values of a DataFrame in spark with multiple values
我有一个有时包含空值的数据框,我想用一系列值(例如 0 到 100 之间的随机整数)中的单个值代替,而不是始终相同的值。
na.fill()
函数好像不允许这样做,我找不到手动完成的好方法。
我在 Python & Spark 2.2
工作
逐一检查每个单元格,检查值是否为空,如果为空,则将其更改为随机数。
首先,随机导入。然后是这样的:
df = df.where(df.a.isNull()).replace(null, random.randrange(min, max+1))
我终于想到了以下解决方案,希望它可以帮助某些人满足我的特定需求,即从数据帧的列中删除空值并用随机值替换它们:
def newRow(model,dataframe):
rows=[]
limit = 0
exec("limit = dataframe.where(dataframe." + model[0] + ".isNull()).count()")
for i in range(0, limit):
x = ""
exec("x = dataframe.where(dataframe."+ model[0] +".isNull()).collect()[i]")
schema = StructType([StructField("A", StringType(), True),
StructField("B", StringType(), True),
StructField("C", StringType(), True),
StructField("D", StringType(), True),
StructField("E", StringType(), True)])
A = None
B = None
C = None
D = None
E = None
if x["A"] != None and model[0] != "A":
A = x["A"].encode('ascii')
if x["B"] != None and model[0] != "B":
B = x["B"].encode('ascii')
if x["C"] != None and model[0] != "C":
C = x["C"].encode('ascii')
if x["D"] != None and model[0] != "D":
D = x["D"].encode('ascii')
if x["E"] != None and model[0] != "E":
E = x["E"].encode('ascii')
exec(model[0] + "=" + model[1])
rows.append(Row(A, B, C, D, E))
return sqlContext.createDataFrame(rows,schema)
如何调用方法:
dfAmodel = newRow(("A","random.uniform(40, 80)"), df1)
df2 = df1.na.drop(subset=['A']).union(dfAmodel)
此处 dfAmodel 是一个新数据框,其行数与输入数据框 df1 中 A 列的空值一样多。仅更改 A 中的空值,其他列的值保持不变。
杂项:
exec() 非常有用,因为它从字符串中执行代码。
解决方案的优势在于能够为您需要的任意多的 df 重复使用此方法,只需更改您想要作为输入的数据框,指定要考虑的列以及将从中获取新闻值的公式。
我有一个有时包含空值的数据框,我想用一系列值(例如 0 到 100 之间的随机整数)中的单个值代替,而不是始终相同的值。
na.fill()
函数好像不允许这样做,我找不到手动完成的好方法。
我在 Python & Spark 2.2
逐一检查每个单元格,检查值是否为空,如果为空,则将其更改为随机数。
首先,随机导入。然后是这样的:
df = df.where(df.a.isNull()).replace(null, random.randrange(min, max+1))
我终于想到了以下解决方案,希望它可以帮助某些人满足我的特定需求,即从数据帧的列中删除空值并用随机值替换它们:
def newRow(model,dataframe):
rows=[]
limit = 0
exec("limit = dataframe.where(dataframe." + model[0] + ".isNull()).count()")
for i in range(0, limit):
x = ""
exec("x = dataframe.where(dataframe."+ model[0] +".isNull()).collect()[i]")
schema = StructType([StructField("A", StringType(), True),
StructField("B", StringType(), True),
StructField("C", StringType(), True),
StructField("D", StringType(), True),
StructField("E", StringType(), True)])
A = None
B = None
C = None
D = None
E = None
if x["A"] != None and model[0] != "A":
A = x["A"].encode('ascii')
if x["B"] != None and model[0] != "B":
B = x["B"].encode('ascii')
if x["C"] != None and model[0] != "C":
C = x["C"].encode('ascii')
if x["D"] != None and model[0] != "D":
D = x["D"].encode('ascii')
if x["E"] != None and model[0] != "E":
E = x["E"].encode('ascii')
exec(model[0] + "=" + model[1])
rows.append(Row(A, B, C, D, E))
return sqlContext.createDataFrame(rows,schema)
如何调用方法:
dfAmodel = newRow(("A","random.uniform(40, 80)"), df1)
df2 = df1.na.drop(subset=['A']).union(dfAmodel)
此处 dfAmodel 是一个新数据框,其行数与输入数据框 df1 中 A 列的空值一样多。仅更改 A 中的空值,其他列的值保持不变。
杂项:
exec() 非常有用,因为它从字符串中执行代码。
解决方案的优势在于能够为您需要的任意多的 df 重复使用此方法,只需更改您想要作为输入的数据框,指定要考虑的列以及将从中获取新闻值的公式。