从以下代码如何将 JavaRDD<Integer> 转换为 DataFrame 或 DataSet
From the following code how to convert a JavaRDD<Integer> to DataFrame or DataSet
public static void main(String[] args) {
SparkSession sessn = SparkSession.builder().appName("RDD2DF").master("local").getOrCreate();
List<Integer> lst = Arrays.asList(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20);
Dataset<Integer> DF = sessn.createDataset(lst, Encoders.INT());
System.out.println(DF.javaRDD().getNumPartitions());
JavaRDD<Integer> mappartRdd = DF.repartition(3).javaRDD().mapPartitions(it-> Arrays.asList(JavaConversions.asScalaIterator(it).length()).iterator());
}
根据上面的代码,我无法将 JavaRdd (mappartRdd) 转换为 Java Spark 中的 DataFrame。
我正在使用下面的方法将 JavaRdd 转换为 DataFrame/DataSet.
sessn.createDataFrame(mappartRdd, beanClass);
我为 createDataFrame 尝试了多个选项和不同的重载函数。我在将其转换为 DF 时遇到问题。我需要为代码提供什么 beanclass?
与scala不同,Java中没有toDF()这样的函数将RDD转换为DataFrame。有人可以根据我的要求协助转换它吗?
注意:我可以通过修改上面的代码直接创建数据集,如下所示。
Dataset<Integer> mappartDS = DF.repartition(3).mapPartitions(it-> Arrays.asList(JavaConversions.asScalaIterator(it).length()).iterator(), Encoders.INT());
但我想知道如果我使用 createDataFrame,为什么我的 JavaRdd 没有转换为 DF/DS。任何帮助将不胜感激。
这似乎是 this SO Question
的跟进
I think, you are in learning stage of spark. I would suggest to understand the apis for java provided - https://spark.apache.org/docs/latest/api/java/index.html
关于你的问题,如果勾选createDataFrame
api,则如下-
def createDataFrame(rowRDD: JavaRDD[Row], schema: StructType): DataFrame = {
...
}
如您所见,它采用 JavaRDD[Row]
和相关的 StructType
架构作为参数。因此要创建等于 Dataset<Row>
的 DataFrame
使用下面的代码片段-
JavaRDD<Integer> mappartRdd = DF.repartition(3).javaRDD().mapPartitions(it-> Arrays.asList(JavaConversions.asScalaIterator(it).length()).iterator());
StructType schema = new StructType()
.add(new StructField("value", DataTypes.IntegerType, true, Metadata.empty()));
Dataset<Row> df = spark.createDataFrame(mappartRdd.map(RowFactory::create), schema);
df.show(false);
df.printSchema();
/**
* +-----+
* |value|
* +-----+
* |6 |
* |8 |
* |6 |
* +-----+
*
* root
* |-- value: integer (nullable = true)
*/
public static void main(String[] args) {
SparkSession sessn = SparkSession.builder().appName("RDD2DF").master("local").getOrCreate();
List<Integer> lst = Arrays.asList(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20);
Dataset<Integer> DF = sessn.createDataset(lst, Encoders.INT());
System.out.println(DF.javaRDD().getNumPartitions());
JavaRDD<Integer> mappartRdd = DF.repartition(3).javaRDD().mapPartitions(it-> Arrays.asList(JavaConversions.asScalaIterator(it).length()).iterator());
}
根据上面的代码,我无法将 JavaRdd (mappartRdd) 转换为 Java Spark 中的 DataFrame。 我正在使用下面的方法将 JavaRdd 转换为 DataFrame/DataSet.
sessn.createDataFrame(mappartRdd, beanClass);
我为 createDataFrame 尝试了多个选项和不同的重载函数。我在将其转换为 DF 时遇到问题。我需要为代码提供什么 beanclass?
与scala不同,Java中没有toDF()这样的函数将RDD转换为DataFrame。有人可以根据我的要求协助转换它吗?
注意:我可以通过修改上面的代码直接创建数据集,如下所示。
Dataset<Integer> mappartDS = DF.repartition(3).mapPartitions(it-> Arrays.asList(JavaConversions.asScalaIterator(it).length()).iterator(), Encoders.INT());
但我想知道如果我使用 createDataFrame,为什么我的 JavaRdd 没有转换为 DF/DS。任何帮助将不胜感激。
这似乎是 this SO Question
的跟进I think, you are in learning stage of spark. I would suggest to understand the apis for java provided - https://spark.apache.org/docs/latest/api/java/index.html
关于你的问题,如果勾选createDataFrame
api,则如下-
def createDataFrame(rowRDD: JavaRDD[Row], schema: StructType): DataFrame = {
...
}
如您所见,它采用 JavaRDD[Row]
和相关的 StructType
架构作为参数。因此要创建等于 Dataset<Row>
的 DataFrame
使用下面的代码片段-
JavaRDD<Integer> mappartRdd = DF.repartition(3).javaRDD().mapPartitions(it-> Arrays.asList(JavaConversions.asScalaIterator(it).length()).iterator());
StructType schema = new StructType()
.add(new StructField("value", DataTypes.IntegerType, true, Metadata.empty()));
Dataset<Row> df = spark.createDataFrame(mappartRdd.map(RowFactory::create), schema);
df.show(false);
df.printSchema();
/**
* +-----+
* |value|
* +-----+
* |6 |
* |8 |
* |6 |
* +-----+
*
* root
* |-- value: integer (nullable = true)
*/