我如何让 pandas 使用 spark 集群

how do i let pandas working with spark cluster

pandas 的主要问题是它无法处理大量操作数据,大量 CSV 文件内存不足,现在我切换到 Hadoop 中的 pyspark 1.6,我已经尝试过 dask.dataframe 但是问题仍然存在,有什么原因让 pandas 与 Hadoop 集群或 pyspark 集群一起工作我想将此功能与 pandas

一起使用
import pandas as pd
df = pd.read_csv('text1.txt',names =['DATE','IMSI','WEBSITE','LINKUP','LINKDOWN','COUNT','CONNECTION'])
df.columns.str.strip()
df.DATE = pd.to_datetime(df.DATE)
group = df.groupby(['IMSI','WEBSITE']).agg({'DATE':[min,max,'count']
    ,'LINKUP':'sum'
    , 'LINKDOWN':'sum'
    , 'COUNT':'max'
    ,'CONNECTION':'sum'
            })
group.to_csv('finalinfo.txt', index = True, header = False)

从 HDFS 读取数据,汇总并发送回 pandas。下面的示例使用 inferSchema 根据数据获取列名和类型,但如果您的文件没有 headers 或者您不喜欢它推断的类型,您可以提供自己的模式。 InferSchema 需要额外传递数据,因此根据数据大小,您可能希望提供自己的架构,而不管:

from pyspark.sql import functions as f

df = spark.read.csv('/hdfs/path/to/text1.txt', header=1, inferSchema=True, sep=';') 
df = df.groupBy('IMSI','WEBSITE').agg(f.min('DATE').alias('min of date'),
                                      f.max('DATE').alias('max of date'),
                                      f.count('DATE').alias('count of date'),
                                      f.sum('LINKUP').alias('sum of linkup'),
                                      f.sum('LINKDOWN').alias('sum of linkdown'),
                                      f.count('COUNT').alias('count of count'),
                                      f.sum('CONNECTION').alias('sum of connection'))
pandasDF = df.toPandas()

或者,如果文件对于 pandas 来说仍然太大,您可以使用 spark 保存到 csv。请注意,您无法控制输出文件的名称 - 您只能指定将创建和存储输出的目录位置,文件名将遵循临时文件命名的 spark 约定:

df.coalesce(1).write.csv('/hdfs/path/to/output/directory', header=True)

coalesce(1) 用于获取单个文件作为输出,因为 spark 将创建等于分区数量的文件(默认 200 iirc)。为此,未分区的文件必须适合单个工作人员的内存。它仍然太大,不要使用合并。 Spark 会将其保存在多个文件中,然后您可以使用 HDFS getmerge 加入文件后缀。