Pyspark 将多个 csv 文件读入数据框(或 RDD?)

Pyspark read multiple csv files into a dataframe (OR RDD?)

我有一个 Spark 2.0.2 集群,我正在通过 Jupyter Notebook 通过 Pyspark 访问它。我有多个管道分隔的 txt 文件(加载到 HDFS。但也可在本地目录中使用)我需要使用 spark-csv 加载到三个单独的数据帧中,具体取决于文件的名称。

我看到了三种我可以采用的方法 - 或者我可以使用 python 以某种方式遍历 HDFS 目录(还没有想出如何做到这一点,加载每个文件然后进行合并。

我也知道 spark 中存在一些通配符功能(参见 )——我可能可以利用

最后,我可以使用 pandas 从磁盘加载 vanilla csv 文件作为 pandas 数据帧,然后创建一个 spark 数据帧。这里的缺点是这些文件很大,在单个节点上加载到内存中可能需要 ~8gb。 (这就是为什么这首先要转移到一个集群)。

这是我目前的代码和这两种方法的一些伪代码:

import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
import pandas as pd

sc = pyspark.SparkContext(appName = 'claims_analysis', master='spark://someIP:7077')

spark = SparkSession(sc)

#METHOD 1 - iterate over HDFS directory
for currFile in os.listdir(HDFS:///someDir//):
    if #filename contains 'claim':
        #create or unionAll to merge claim_df
    if #filename contains 'pharm':
        #create or unionAll to merge pharm_df
    if #filename contains 'service':
        #create or unionAll to merge service_df

#Method 2 - some kind of wildcard functionality
claim_df = spark.read.format('com.databricks.spark.csv').options(delimiter = '|',header ='true',nullValue ='null').load('HDFS:///someDir//*<claim>.csv')
pharm_df = spark.read.format('com.databricks.spark.csv').options(delimiter = '|',header ='true',nullValue ='null').load('HDFS:///someDir//*<pharm>.csv')
service_df = spark.read.format('com.databricks.spark.csv').options(delimiter = '|',header ='true',nullValue ='null').load('HDFS:///someDir//*<service>.csv')


#METHOD 3 - load to a pandas df and then convert to spark df
for currFile in os.listdir(HDFS:///someDir//)
    pd_df = pd.read_csv(currFile, sep = '|')
    df = spark.createDataFrame(pd_df)
    if #filename contains 'claim':
        #create or unionAll to merge claim_df
    if #filename contains 'pharm':
        #create or unionAll to merge pharm_df
    if #filename contains 'service':
        #create or unionAll to merge service_df

有谁知道如何实现方法 1 或 2?我一直无法弄清楚这些。此外,令我感到惊讶的是,没有更好的方法将 csv 文件加载到 pyspark 数据帧中 - 使用第三方包来处理看起来应该是本机功能的东西让我感到困惑(我只是错过了标准用例吗用于将 csv 文件加载到数据帧中?)最终,我将把一个合并的单个数据帧写回 HDFS(使用 .write.parquet() ),这样我就可以清除内存并使用 MLlib 进行一些分析.如果我强调的方法不是最佳实践,我将不胜感激推动正确的方向!

方法一:

在 python 中您不能直接引用 HDFS 位置。你需要借助另一个库,比如 pydoop。在 scala 和 java 中,你有 API。即使使用 pydoop,您也会一个接一个地阅读文件。一个一个读取文件,不使用spark提供的并行读取选项是不好的。

方法二:

您应该能够用逗号分隔或通配符指向多个文件。这样 spark 负责读取文件并将它们分发到分区中。但是,如果您对每个数据框使用 union 选项,那么当您动态读取每个文件时,就会出现一种边缘情况。当您有很多文件时,列表在驱动程序级别会变得非常庞大,并可能导致内存问题。主要原因是,读取过程仍在驱动程序级别发生。

这个选项更好。 spark 会读取所有与正则表达式相关的文件并将它们转换为分区。您为所有通配符匹配得到一个 RDD,从那里您无需担心单个 rdd 的并集

示例代码片段:

distFile = sc.textFile("/hdfs/path/to/folder/fixed_file_name_*.csv")

方法 3:

除非您在 python 中有一些遗留应用程序使用 pandas 的功能,否则我更愿意使用提供的 spark API

我来到这里是为了完成类似的事情。我有一个函数可以读取 HDFS 和 return 列表字典。

def get_hdfs_input_files(hdfs_input_dir):
    """Returns a dictionary object with a file list from HDFS
    :rtype: dict
    """
    import subprocess
    sub_proc_cmd = "hdfs dfs -ls " + hdfs_input_dir + " | awk '{print }'"
    process = subprocess.run(sub_proc_cmd, shell=True, stdout=subprocess.PIPE)
    decoded_process = process.stdout.decode('utf-8')
    file_list = decoded_process.split("\n")
    claim_list, pharma_list, service_list = [], [], []
    for file in file_list:
        if file[-4:] == 'claim':
            claim_list.append(file)
        elif file[-4:] == 'pharma':
            pharma_list.append(file)
        elif file[-3:] == 'service':
            service_list.append(file)
    ret_dict = {'claim': claim_list, 'pharma': pharma_list, 'service': service_list}
    return ret_dict

获得 CSV 文件列表后,您可以使用 Pyspark 将它们全部读入 RDD。 docs 声明 CSV DataFrameReader 将接受 "string, or list of strings, for input path(s), or RDD of Strings storing CSV rows"。只需将文件列表传递给该方法即可。

file_list = get_hdfs_input_files('/some/hdfs/dir')
claim_df = spark.read.csv(my_list.get('claim'), 
               delimiter = '|',header ='true',nullValue ='null')  
pharma_df = spark.read.csv(my_list.get('pharma'), 
               delimiter = '|',header ='true',nullValue ='null')
service_df = spark.read.csv(my_list.get('service'), 
               delimiter = '|',header ='true',nullValue ='null')