将 CSV 读取为数据框并转换为 JSON 字符串

Read CSV as dataframe and convert to JSON string

我正在尝试通过 Spark SQL 聚合 CSV 文件,然后将结果显示为 JSON:

val people = sqlContext.read().format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").option("delimiter", ",").load("/tmp/people.csv")  
people.registerTempTable("people")  
val result = sqlContext.sql("select country, count(*) as cnt from people group by country")

这就是我被困的地方。我可以使用完美无缺的 result.schema().prettyJson(),但我找不到将 return result 作为 JSON.

的方法

我假设 result.toJSON.collect() 应该做我想做的事,但这失败了

org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 101.0 failed 1 times, most recent failure: Lost task 1.0 in stage 101.0 (TID 159, localhost): java.lang.NegativeArraySizeException
    at com.databricks.spark.csv.CsvRelation$$anonfun$buildScan.apply(CsvRelation.scala:171)
    at com.databricks.spark.csv.CsvRelation$$anonfun$buildScan.apply(CsvRelation.scala:162)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:371)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:327)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:511)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.<init>(TungstenAggregationIterator.scala:686)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$$anonfun.apply(TungstenAggregate.scala:95)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$$anonfun.apply(TungstenAggregate.scala:86)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$$anonfun$apply.apply(RDD.scala:704)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$$anonfun$apply.apply(RDD.scala:704)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

错误。有人可以指导我吗?

您收到的错误很奇怪,听起来结果可能是空的?

您可能想在数据帧上尝试使用此命令来打印每一行:

result.toJSON.foreach(println)

有关更多信息,请参阅 Dataframe API

原来这个错误是因为 "malformed" CSV 文件。它包含一些行,这些行的列数比其他行多(没有 header 字段名称)...虽然有奇怪的错误消息。

尝试

val people = sqlContext.read().format("com.databricks.spark.csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .option("mode", "DROPMALFORMED")
  .option("delimiter", ",")
  .load("/tmp/people.csv")  
people.registerTempTable("people")  
val result = sqlContext.sql("select country, count(*) as cnt from people group by country")