Spark RDD to Dataframe with schema specifying
Spark RDD to Dataframe with schema specifying
当通过 Row 对象从 RDD 转换 DataFrame 时,spark 似乎无法为 DataFrame 应用模式(不同于 String)。我在 Spark 1.4 和 1.5 版本上都试过了。
片段(Java API):
JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(jssc, String.class, String.class,
StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet);
directKafkaStream.foreachRDD(rdd -> {
rdd.foreach(x -> System.out.println("x._1() = " + x._1()));
rdd.foreach(x -> System.out.println("x._2() = " + x._2()));
JavaRDD<Row> rowRdd = rdd.map(x -> RowFactory.create(x._2().split("\t")));
rowRdd.foreach(x -> System.out.println("x = " + x));
SQLContext sqlContext = SQLContext.getOrCreate(rdd.context());
StructField id = DataTypes.createStructField("id", DataTypes.IntegerType, true);
StructField name = DataTypes.createStructField("name", DataTypes.StringType, true);
List<StructField> fields = Arrays.asList(id, name);
StructType schema = DataTypes.createStructType(fields);
DataFrame sampleDf = sqlContext.createDataFrame(rowRdd, schema);
sampleDf.printSchema();
sampleDf.show();
return null;
});
jssc.start();
jssc.awaitTermination();
如果为 "id" 字段指定 DataTypes.StringType,它会产生以下输出:
x._1() = null
x._2() = 1 item1
x = [1,item1]
root
|-- id: string (nullable = true)
|-- name: string (nullable = true)
+---+-----+
| id| name|
+---+-----+
| 1|item1|
+---+-----+
对于指定的代码,它抛出错误:
x._1() = null
x._2() = 1 item1
x = [1,item1]
root
|-- id: integer (nullable = true)
|-- name: string (nullable = true)
15/09/16 04:13:33 ERROR JobScheduler: Error running job streaming job 1442402013000 ms.0
java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer
at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106)
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getInt(rows.scala:40)
at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getInt(rows.scala:220)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$IntConverter$.toScalaImpl(CatalystTypeConverters.scala:358)
类似的问题出现在 Spark Confluence,但它被标记为已解决 1.3 版本
您正在混合两种不同的东西 - 数据类型和 DataFrame 模式。当您像这样创建 Row
时:
RowFactory.create(x._2().split("\t"))
你得到了 Row(_: String, _: String)
,但你的模式表明你有 Row(_: Integer, _: String)
。由于没有自动类型转换,您会收到错误消息。
要使其正常工作,您可以在创建行时转换值或将 id
定义为 StringType
并在创建 DataFrame 后使用 Column.cast
方法。
当通过 Row 对象从 RDD 转换 DataFrame 时,spark 似乎无法为 DataFrame 应用模式(不同于 String)。我在 Spark 1.4 和 1.5 版本上都试过了。
片段(Java API):
JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(jssc, String.class, String.class,
StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet);
directKafkaStream.foreachRDD(rdd -> {
rdd.foreach(x -> System.out.println("x._1() = " + x._1()));
rdd.foreach(x -> System.out.println("x._2() = " + x._2()));
JavaRDD<Row> rowRdd = rdd.map(x -> RowFactory.create(x._2().split("\t")));
rowRdd.foreach(x -> System.out.println("x = " + x));
SQLContext sqlContext = SQLContext.getOrCreate(rdd.context());
StructField id = DataTypes.createStructField("id", DataTypes.IntegerType, true);
StructField name = DataTypes.createStructField("name", DataTypes.StringType, true);
List<StructField> fields = Arrays.asList(id, name);
StructType schema = DataTypes.createStructType(fields);
DataFrame sampleDf = sqlContext.createDataFrame(rowRdd, schema);
sampleDf.printSchema();
sampleDf.show();
return null;
});
jssc.start();
jssc.awaitTermination();
如果为 "id" 字段指定 DataTypes.StringType,它会产生以下输出:
x._1() = null
x._2() = 1 item1
x = [1,item1]
root
|-- id: string (nullable = true)
|-- name: string (nullable = true)
+---+-----+
| id| name|
+---+-----+
| 1|item1|
+---+-----+
对于指定的代码,它抛出错误:
x._1() = null
x._2() = 1 item1
x = [1,item1]
root
|-- id: integer (nullable = true)
|-- name: string (nullable = true)
15/09/16 04:13:33 ERROR JobScheduler: Error running job streaming job 1442402013000 ms.0
java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer
at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106)
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getInt(rows.scala:40)
at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getInt(rows.scala:220)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$IntConverter$.toScalaImpl(CatalystTypeConverters.scala:358)
类似的问题出现在 Spark Confluence,但它被标记为已解决 1.3 版本
您正在混合两种不同的东西 - 数据类型和 DataFrame 模式。当您像这样创建 Row
时:
RowFactory.create(x._2().split("\t"))
你得到了 Row(_: String, _: String)
,但你的模式表明你有 Row(_: Integer, _: String)
。由于没有自动类型转换,您会收到错误消息。
要使其正常工作,您可以在创建行时转换值或将 id
定义为 StringType
并在创建 DataFrame 后使用 Column.cast
方法。