将 CSV 读取为数据框并转换为 JSON 字符串
Read CSV as dataframe and convert to JSON string
我正在尝试通过 Spark SQL 聚合 CSV 文件,然后将结果显示为 JSON:
val people = sqlContext.read().format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").option("delimiter", ",").load("/tmp/people.csv")
people.registerTempTable("people")
val result = sqlContext.sql("select country, count(*) as cnt from people group by country")
这就是我被困的地方。我可以使用完美无缺的 result.schema().prettyJson()
,但我找不到将 return result
作为 JSON.
的方法
我假设 result.toJSON.collect()
应该做我想做的事,但这失败了
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 101.0 failed 1 times, most recent failure: Lost task 1.0 in stage 101.0 (TID 159, localhost): java.lang.NegativeArraySizeException
at com.databricks.spark.csv.CsvRelation$$anonfun$buildScan.apply(CsvRelation.scala:171)
at com.databricks.spark.csv.CsvRelation$$anonfun$buildScan.apply(CsvRelation.scala:162)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:327)
at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:511)
at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.<init>(TungstenAggregationIterator.scala:686)
at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$$anonfun.apply(TungstenAggregate.scala:95)
at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$$anonfun.apply(TungstenAggregate.scala:86)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$$anonfun$apply.apply(RDD.scala:704)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$$anonfun$apply.apply(RDD.scala:704)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
错误。有人可以指导我吗?
您收到的错误很奇怪,听起来结果可能是空的?
您可能想在数据帧上尝试使用此命令来打印每一行:
result.toJSON.foreach(println)
有关更多信息,请参阅 Dataframe API
原来这个错误是因为 "malformed" CSV 文件。它包含一些行,这些行的列数比其他行多(没有 header 字段名称)...虽然有奇怪的错误消息。
尝试
val people = sqlContext.read().format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema", "true")
.option("mode", "DROPMALFORMED")
.option("delimiter", ",")
.load("/tmp/people.csv")
people.registerTempTable("people")
val result = sqlContext.sql("select country, count(*) as cnt from people group by country")
我正在尝试通过 Spark SQL 聚合 CSV 文件,然后将结果显示为 JSON:
val people = sqlContext.read().format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").option("delimiter", ",").load("/tmp/people.csv")
people.registerTempTable("people")
val result = sqlContext.sql("select country, count(*) as cnt from people group by country")
这就是我被困的地方。我可以使用完美无缺的 result.schema().prettyJson()
,但我找不到将 return result
作为 JSON.
我假设 result.toJSON.collect()
应该做我想做的事,但这失败了
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 101.0 failed 1 times, most recent failure: Lost task 1.0 in stage 101.0 (TID 159, localhost): java.lang.NegativeArraySizeException
at com.databricks.spark.csv.CsvRelation$$anonfun$buildScan.apply(CsvRelation.scala:171)
at com.databricks.spark.csv.CsvRelation$$anonfun$buildScan.apply(CsvRelation.scala:162)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:327)
at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:511)
at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.<init>(TungstenAggregationIterator.scala:686)
at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$$anonfun.apply(TungstenAggregate.scala:95)
at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$$anonfun.apply(TungstenAggregate.scala:86)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$$anonfun$apply.apply(RDD.scala:704)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$$anonfun$apply.apply(RDD.scala:704)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
错误。有人可以指导我吗?
您收到的错误很奇怪,听起来结果可能是空的?
您可能想在数据帧上尝试使用此命令来打印每一行:
result.toJSON.foreach(println)
有关更多信息,请参阅 Dataframe API
原来这个错误是因为 "malformed" CSV 文件。它包含一些行,这些行的列数比其他行多(没有 header 字段名称)...虽然有奇怪的错误消息。
尝试
val people = sqlContext.read().format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema", "true")
.option("mode", "DROPMALFORMED")
.option("delimiter", ",")
.load("/tmp/people.csv")
people.registerTempTable("people")
val result = sqlContext.sql("select country, count(*) as cnt from people group by country")