PySPark:如何从 pyspark 中的变量创建 JSON 和 CSV 文件?
PySPark: How to create JSON and CSV file from a variable in pyspark?
我正在尝试将变量的结果写入 csv 文件,然后从中创建一个 json。 for循环的每次迭代都会将以下结果写入变量res_df。如果可以在不创建 csv 的情况下直接创建 json ,那么我也很乐意实现它。请帮忙。
'var_id', 10000001, 14003088.0, 14228946.912793402, 1874168.857698741, 15017976.0, 18000192, 0
现在我想将此结果附加到一个 csv 文件中,然后从中创建一个 json。我已经在我的 python 代码中实现了它。现在需要你的帮助来了解如何在 pyspark
中实现相同的目标
Python代码:
res_df=line,x.min(),np.percentile(x, 25),np.mean(x),np.std(x),np.percentile(x, 75),x.max(),df[line].isnull().mean() * 100
with open(data_output_file, 'a', newline='') as csvfile:
writerows = csv.writer(csvfile, delimiter=',',
quotechar='"', quoting=csv.QUOTE_MINIMAL)
writerows.writerow(map(lambda x: x, res_df))
quality_json_df = pd.read_csv(r'./DQ_RESULT.csv')
# it will dump json to file
quality_json_df.to_json("./Dq_Data.json", orient="records")
我的 Pyspark 代码
for line in tcp.collect():
#print value in MyCol1 for each row
print line
v3=np.array(data.select(line).collect())
x = v3[np.logical_not(np.isnan(v3))]
print(x)
cnt_null=data.filter((data[line] == "") | data[line].isNull() | isnan(data[line])).count()
print(cnt_null)
res_df=line,x.min(),np.percentile(x, 25),np.mean(x),np.std(x),np.percentile(x, 75),x.max(),cnt_null
print(res_df)
json_output = []
column_statistic = ["variable_name", "min", "Q1", "mean", "std", "Q3", "max", "null_value"]
for line in tcp.collect():
# print value in MyCol1 for each row
print
line
v3 = np.array(data.select(line).collect())
x = v3[np.logical_not(np.isnan(v3))]
notnan_cnt = np.count_nonzero(v3)
print(x)
cnt_null = data.filter((data[line] == "") | data[line].isNull() | isnan(data[line])).count()
print(cnt_null, notnan_cnt)
res_df = [str(line), x.min(), np.percentile(x, 25), np.mean(x), np.std(x), np.percentile(x, 75), x.max(), cnt_null]
json_row = {key: value for key, value in zip(column_statistic, res_df)}
json_output.append(json_row)
print(res_df)
with open("json_result.json", "w") as fp:
json.dump(json_output, fp)
我正在尝试将变量的结果写入 csv 文件,然后从中创建一个 json。 for循环的每次迭代都会将以下结果写入变量res_df。如果可以在不创建 csv 的情况下直接创建 json ,那么我也很乐意实现它。请帮忙。
'var_id', 10000001, 14003088.0, 14228946.912793402, 1874168.857698741, 15017976.0, 18000192, 0
现在我想将此结果附加到一个 csv 文件中,然后从中创建一个 json。我已经在我的 python 代码中实现了它。现在需要你的帮助来了解如何在 pyspark
中实现相同的目标Python代码:
res_df=line,x.min(),np.percentile(x, 25),np.mean(x),np.std(x),np.percentile(x, 75),x.max(),df[line].isnull().mean() * 100
with open(data_output_file, 'a', newline='') as csvfile:
writerows = csv.writer(csvfile, delimiter=',',
quotechar='"', quoting=csv.QUOTE_MINIMAL)
writerows.writerow(map(lambda x: x, res_df))
quality_json_df = pd.read_csv(r'./DQ_RESULT.csv')
# it will dump json to file
quality_json_df.to_json("./Dq_Data.json", orient="records")
我的 Pyspark 代码
for line in tcp.collect():
#print value in MyCol1 for each row
print line
v3=np.array(data.select(line).collect())
x = v3[np.logical_not(np.isnan(v3))]
print(x)
cnt_null=data.filter((data[line] == "") | data[line].isNull() | isnan(data[line])).count()
print(cnt_null)
res_df=line,x.min(),np.percentile(x, 25),np.mean(x),np.std(x),np.percentile(x, 75),x.max(),cnt_null
print(res_df)
json_output = []
column_statistic = ["variable_name", "min", "Q1", "mean", "std", "Q3", "max", "null_value"]
for line in tcp.collect():
# print value in MyCol1 for each row
print
line
v3 = np.array(data.select(line).collect())
x = v3[np.logical_not(np.isnan(v3))]
notnan_cnt = np.count_nonzero(v3)
print(x)
cnt_null = data.filter((data[line] == "") | data[line].isNull() | isnan(data[line])).count()
print(cnt_null, notnan_cnt)
res_df = [str(line), x.min(), np.percentile(x, 25), np.mean(x), np.std(x), np.percentile(x, 75), x.max(), cnt_null]
json_row = {key: value for key, value in zip(column_statistic, res_df)}
json_output.append(json_row)
print(res_df)
with open("json_result.json", "w") as fp:
json.dump(json_output, fp)