如何将 excel 数据读入 spark/scala 中的数据帧

how to read excel data into a dataframe in spark/scala

我有一个要求where-in我需要阅读spark/scala中的excel文件(扩展名为.xlsx)。我需要创建一个数据框,其中包含从 excel 和 apply/write sql 查询中读取的数据,并在其上进行一些分析。 excel 文件有一些列 headers/titles,例如“time_spend_company(年)”、“average_monthly_hours(小时)" 等作为 headers 本身的空格,这些空格导致我无法在加载的数据帧上应用任何 sql 查询。

我正在使用 com.crealytics.spark.excel 库来解析 excel 内容,我的代码如下所示

val empFile = "C:\EmpDatasets.xlsx"

val employeesDF = sc.sqlContext.read
  .format("com.crealytics.spark.excel")
  .option("sheetName", "Sheet1")
  .option("useHeader", "true")
  .option("treatEmptyValuesAsNulls", "false")
  .option("inferSchema", "false")
  .option("location", empFile)
  .option("addColorColumns", "False")
  .load()

employeesDF.createOrReplaceTempView("EMP")

我想在这些列上应用一些 group by 和其他聚合函数,但我面临这些列的问题,如下所示,我的要求是在 [= 上应用 group by 45=] 列并计算它的数量。

val expLevel = sc.sqlContext.sql("Select 'time_spend_company (Years)' as 'Years_spent_in_company',count(1) from EMP where left_company = 1 group by 'time_spend_company (Years)'")
expLevel.show

我需要帮助:-

  1. 有没有更好的方法来加载 excel 并为其分配自定义列名并创建数据框?
  2. 如何为这些包含空格的列名编写 sql 查询?

注意:我只需要将其作为 excel 文件阅读,我无法将其转换为 csv 或任何其他文件格式。

  1. Spark 对处理 CSV 有很好的支持。因此,如果您的 excel 文件只有一个 sheet,您只需将 EmpDatasets.xlsx 重命名为 EmpDatasets.csv 即可将其转换为 CSV。使用this来完成。

一旦您的文件为 CSV,您就可以将其读取为 spark.read.csv(pathToCSV) 并且可以提供许多选项,例如:到 read/skip header 或将数据集的模式提供为 spark.read.schema(schema).csv(pathToCSV).

这里schema可以按描述创建here or can be extracted from a case class using spark sql EncodersEncoders.product[case_class_name].schema

  1. 您可以删除列名中的空格,例如:

val employeesDFColumns = employeesDF.columns.map(x => col(x.replaceAll(" ", "")))

并在数据框上应用这些新的列名。

val employeeDF = employeeDF.select(employeesDFColumns:_*)

问题 2 的答案:尽管使用 ',但您需要在列名的开头和结尾前使用 `,并带有空格。尝试下面的查询它会起作用:

val expLevel = sc.sqlContext.sql("Select `time_spend_company (Years)` as `Years_spent_in_company`,count(1) from EMP where left_company = 1 group by `time_spend_company (Years)`")

问题 1:使用 "com.crealytics.spark.excel" 加载 excel 是可以的。我也在用。也可以有不同的选择。要分配不同的列名,您可以使用结构类型来定义模式并在将数据加载到数据帧期间将其强加。例如

val newSchema = StructType(
    List(StructField("a", IntegerType, nullable = true),
         StructField("b", IntegerType, nullable = true),
         StructField("c", IntegerType, nullable = true),
         StructField("d", IntegerType, nullable = true))
  )

val employeesDF = spark.read.schema(newSchema)
  .format("com.crealytics.spark.excel")
  .option("sheetName", "Sheet1")
  .option("useHeader", "true")
  .option("treatEmptyValuesAsNulls", "false")
  .option("inferSchema", "false")
  .option("location", empFile)
  .option("addColorColumns", "False")
  .load()

前四个列名现在将由 a、b、c 和 d 访问。 运行 下面的查询将适用于新的列名。

sc.sqlContext.sql("select a,b,c,d from EMP").show()

对于版本 0.13.5,您将需要一组不同的参数:

def readExcel(file: String): DataFrame = {
    sqlContext.read
      .format("com.crealytics.spark.excel")
      .option("dataAddress", "'sheet_name'!A1") // Optional, default: "A1"
      .option("header", "true") // Required
      .option("treatEmptyValuesAsNulls", "false") // Optional, default: true
      .option("inferSchema", "true") // Optional, default: false
      .option("addColorColumns", "false") // Optional, default: false
      .option("timestampFormat", "MM-dd-yyyy HH:mm:ss") // Optional, default: yyyy-mm-dd hh:mm:ss[.fffffffff]
      .option("maxRowsInMemory", 20) // Optional, d[#All]efault None. If set, uses a streaming reader which can help with big files
      .load(file)
  }

maven 依赖:

<dependency>
  <groupId>com.crealytics</groupId>
  <artifactId>spark-excel_2.11</artifactId>
  <version>0.13.5</version>
</dependency>