来自 SPARK 中管道分隔文件的数据帧
Dataframes from pipe delimited file in SPARK
我是 SPARK 的新手,所以尝试编写一个小程序并 运行 出现以下错误。
有人可以帮忙吗?
仅供参考 - 当示例文件的列中没有空数据时,该程序似乎可以正常工作,但问题似乎是由于第二行中的空值所致。
数据:TEMP_EMP.dat
的内容
1232|JOHN|30|IT
1532|DAVE|50|
1542|JEN|25|QA
用于将此数据解析为数据帧的 SCALA 代码
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.{StructType, StructField, StringType};
val employee = sc.textFile("file:///TEMP_EMP.dat")
val textFileTemp = sc.textFile("file:///TEMP_EMP.dat");
val schemaString = "ID|NAME|AGE|DEPT";
val schema = StructType(schemaString.split('|').map(fieldName=>StructField(fieldName,StringType,true)));
val rowRDD = employee.map(_.split('|')).map(e => Row(e(0),e(1),e(2), e(3) ));
val employeeDF = sqlContext.createDataFrame(rowRDD, schema);
employeeDF.registerTempTable("employee");
val allrecords = sqlContext.sql("SELECT * FROM employee");
allrecords.show();
错误日志:
WARN 2016-08-17 13:36:21,006 org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 6.0 : java.lang.ArrayIndexOutOfBoundsException: 3
这一行:
val rowRDD = employee.map(_.split('|')).map(e => Row(e(0),e(1),e(2), e(3) ));
您假设 employee.map(_.split('|'))
的结果至少有四个元素,但第二行只有 3 个,因此出现索引越界异常。
举例说明:
scala> val oneRow = "1532|DAVE|50|".split('|')
oneRow: Array[String] = Array(1532, DAVE, 50)
scala> oneRow(3)
java.lang.ArrayIndexOutOfBoundsException: 3
我们应该这样拆分它:
val schema = StructType(
schemaString
.split("|",-1)
.map( fieldName => StructField(fieldName,StringType,true) )
);
val rowRDD = employee
.map( _.split("|", -1) )
.map( e => Row(e(0),e(1),e(2),e(3)) );
我是 SPARK 的新手,所以尝试编写一个小程序并 运行 出现以下错误。 有人可以帮忙吗?
仅供参考 - 当示例文件的列中没有空数据时,该程序似乎可以正常工作,但问题似乎是由于第二行中的空值所致。
数据:TEMP_EMP.dat
的内容1232|JOHN|30|IT
1532|DAVE|50|
1542|JEN|25|QA
用于将此数据解析为数据帧的 SCALA 代码
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.{StructType, StructField, StringType};
val employee = sc.textFile("file:///TEMP_EMP.dat")
val textFileTemp = sc.textFile("file:///TEMP_EMP.dat");
val schemaString = "ID|NAME|AGE|DEPT";
val schema = StructType(schemaString.split('|').map(fieldName=>StructField(fieldName,StringType,true)));
val rowRDD = employee.map(_.split('|')).map(e => Row(e(0),e(1),e(2), e(3) ));
val employeeDF = sqlContext.createDataFrame(rowRDD, schema);
employeeDF.registerTempTable("employee");
val allrecords = sqlContext.sql("SELECT * FROM employee");
allrecords.show();
错误日志:
WARN 2016-08-17 13:36:21,006 org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 6.0 : java.lang.ArrayIndexOutOfBoundsException: 3
这一行:
val rowRDD = employee.map(_.split('|')).map(e => Row(e(0),e(1),e(2), e(3) ));
您假设 employee.map(_.split('|'))
的结果至少有四个元素,但第二行只有 3 个,因此出现索引越界异常。
举例说明:
scala> val oneRow = "1532|DAVE|50|".split('|')
oneRow: Array[String] = Array(1532, DAVE, 50)
scala> oneRow(3)
java.lang.ArrayIndexOutOfBoundsException: 3
我们应该这样拆分它:
val schema = StructType(
schemaString
.split("|",-1)
.map( fieldName => StructField(fieldName,StringType,true) )
);
val rowRDD = employee
.map( _.split("|", -1) )
.map( e => Row(e(0),e(1),e(2),e(3)) );