如何使用 pyspark 将 sql 语句的结果发送到 for 循环?

How to send the result of a sql statement to a for loop using pyspark?

我正在尝试将 sql 结果发送到 for 循环。我是 spark 的新手,python,请帮忙。

    from pyspark import SparkContext
sc =SparkContext()
from pyspark.sql import HiveContext
hive_context = HiveContext(sc)
#bank = hive_context.table("cip_utilities.file_upload_temp")
data=hive_context.sql("select * from cip_utilities.cdm_variable_dict")
hive_context.sql("describe cip_utilities.cdm_variables_dict").registerTempTable("schema_def")
temp_data=hive_context.sql("select * from schema_def")
temp_data.show()
data1=hive_context.sql("select col_name from schema_def where data_type<>'string'")
data1.show()
  • 使用DataFrame.collect() method,它将来自所有executorSpark-SQL查询结果聚合到driver.

  • collect()方法会return一个Pythonlist,其中每个元素都是一个SparkRow

  • 然后您可以在 for-loop

  • 中迭代此列表

代码片段:

data1 = hive_context.sql("select col_name from schema_def where data_type<>'string'")
colum_names_as_python_list_of_rows = data1.collect()

我认为您需要问问自己为什么 想要遍历数据。

你在做聚合吗?转换数据?如果是这样,请考虑使用 spark API.

正在打印一些文字?如果是这样,则使用 .collect() 并将数据检索回您的驱动程序进程。然后你可以用通常的 python 方式循环结果。