我如何让 pandas 使用 spark 集群
how do i let pandas working with spark cluster
pandas 的主要问题是它无法处理大量操作数据,大量 CSV 文件内存不足,现在我切换到 Hadoop 中的 pyspark 1.6,我已经尝试过 dask.dataframe 但是问题仍然存在,有什么原因让 pandas 与 Hadoop 集群或 pyspark 集群一起工作我想将此功能与 pandas
一起使用
import pandas as pd
df = pd.read_csv('text1.txt',names =['DATE','IMSI','WEBSITE','LINKUP','LINKDOWN','COUNT','CONNECTION'])
df.columns.str.strip()
df.DATE = pd.to_datetime(df.DATE)
group = df.groupby(['IMSI','WEBSITE']).agg({'DATE':[min,max,'count']
,'LINKUP':'sum'
, 'LINKDOWN':'sum'
, 'COUNT':'max'
,'CONNECTION':'sum'
})
group.to_csv('finalinfo.txt', index = True, header = False)
从 HDFS 读取数据,汇总并发送回 pandas。下面的示例使用 inferSchema 根据数据获取列名和类型,但如果您的文件没有 headers 或者您不喜欢它推断的类型,您可以提供自己的模式。 InferSchema 需要额外传递数据,因此根据数据大小,您可能希望提供自己的架构,而不管:
from pyspark.sql import functions as f
df = spark.read.csv('/hdfs/path/to/text1.txt', header=1, inferSchema=True, sep=';')
df = df.groupBy('IMSI','WEBSITE').agg(f.min('DATE').alias('min of date'),
f.max('DATE').alias('max of date'),
f.count('DATE').alias('count of date'),
f.sum('LINKUP').alias('sum of linkup'),
f.sum('LINKDOWN').alias('sum of linkdown'),
f.count('COUNT').alias('count of count'),
f.sum('CONNECTION').alias('sum of connection'))
pandasDF = df.toPandas()
或者,如果文件对于 pandas 来说仍然太大,您可以使用 spark 保存到 csv。请注意,您无法控制输出文件的名称 - 您只能指定将创建和存储输出的目录位置,文件名将遵循临时文件命名的 spark 约定:
df.coalesce(1).write.csv('/hdfs/path/to/output/directory', header=True)
coalesce(1) 用于获取单个文件作为输出,因为 spark 将创建等于分区数量的文件(默认 200 iirc)。为此,未分区的文件必须适合单个工作人员的内存。它仍然太大,不要使用合并。 Spark 会将其保存在多个文件中,然后您可以使用 HDFS getmerge 加入文件后缀。
pandas 的主要问题是它无法处理大量操作数据,大量 CSV 文件内存不足,现在我切换到 Hadoop 中的 pyspark 1.6,我已经尝试过 dask.dataframe 但是问题仍然存在,有什么原因让 pandas 与 Hadoop 集群或 pyspark 集群一起工作我想将此功能与 pandas
一起使用import pandas as pd
df = pd.read_csv('text1.txt',names =['DATE','IMSI','WEBSITE','LINKUP','LINKDOWN','COUNT','CONNECTION'])
df.columns.str.strip()
df.DATE = pd.to_datetime(df.DATE)
group = df.groupby(['IMSI','WEBSITE']).agg({'DATE':[min,max,'count']
,'LINKUP':'sum'
, 'LINKDOWN':'sum'
, 'COUNT':'max'
,'CONNECTION':'sum'
})
group.to_csv('finalinfo.txt', index = True, header = False)
从 HDFS 读取数据,汇总并发送回 pandas。下面的示例使用 inferSchema 根据数据获取列名和类型,但如果您的文件没有 headers 或者您不喜欢它推断的类型,您可以提供自己的模式。 InferSchema 需要额外传递数据,因此根据数据大小,您可能希望提供自己的架构,而不管:
from pyspark.sql import functions as f
df = spark.read.csv('/hdfs/path/to/text1.txt', header=1, inferSchema=True, sep=';')
df = df.groupBy('IMSI','WEBSITE').agg(f.min('DATE').alias('min of date'),
f.max('DATE').alias('max of date'),
f.count('DATE').alias('count of date'),
f.sum('LINKUP').alias('sum of linkup'),
f.sum('LINKDOWN').alias('sum of linkdown'),
f.count('COUNT').alias('count of count'),
f.sum('CONNECTION').alias('sum of connection'))
pandasDF = df.toPandas()
或者,如果文件对于 pandas 来说仍然太大,您可以使用 spark 保存到 csv。请注意,您无法控制输出文件的名称 - 您只能指定将创建和存储输出的目录位置,文件名将遵循临时文件命名的 spark 约定:
df.coalesce(1).write.csv('/hdfs/path/to/output/directory', header=True)
coalesce(1) 用于获取单个文件作为输出,因为 spark 将创建等于分区数量的文件(默认 200 iirc)。为此,未分区的文件必须适合单个工作人员的内存。它仍然太大,不要使用合并。 Spark 会将其保存在多个文件中,然后您可以使用 HDFS getmerge 加入文件后缀。