使用 Java API 创建一个简单的 1 行 Spark DataFrame
Creating a simple 1-row Spark DataFrame with Java API
在 Scala 中,我可以从内存中的字符串创建单行 DataFrame,如下所示:
val stringAsList = List("buzz")
val df = sqlContext.sparkContext.parallelize(jsonValues).toDF("fizz")
df.show()
当df.show()
运行时,输出:
+-----+
| fizz|
+-----+
| buzz|
+-----+
现在我正在尝试从 Java class. 内部执行此操作,显然 JavaRDD
没有 toDF(String)
方法。我试过:
List<String> stringAsList = new ArrayList<String>();
stringAsList.add("buzz");
SQLContext sqlContext = new SQLContext(sparkContext);
DataFrame df = sqlContext.createDataFrame(sparkContext
.parallelize(stringAsList), StringType);
df.show();
...但似乎仍然不足。现在当 df.show();
执行时,我得到:
++
||
++
||
++
(一个空的 DF。)所以我问:使用 Java API,如何将内存中的字符串读入 DataFrame其中只有 1 行和 1 列,并指定该列的名称?(以便 df.show()
与上面的 Scala 相同)?
您可以通过为 Rdd 创建 List 而不是创建将包含列名称的 Schema 来实现此目的。
可能还有其他方法,这只是其中一种。
List<String> stringAsList = new ArrayList<String>();
stringAsList.add("buzz");
JavaRDD<Row> rowRDD = sparkContext.parallelize(stringAsList).map((String row) -> {
return RowFactory.create(row);
});
StructType schema = DataTypes.createStructType(new StructField[] { DataTypes.createStructField("fizz", DataTypes.StringType, false) });
DataFrame df = sqlContext.createDataFrame(rowRDD, schema).toDF();
df.show();
//+----+
|fizz|
+----+
|buzz|
如果您需要升级,我已经为 Spark 2 创建了 2 个示例:
简单 Fizz/Buzz(或 foe/bar - 老一代 :)):
SparkSession spark = SparkSession.builder().appName("Build a DataFrame from Scratch").master("local[*]")
.getOrCreate();
List<String> stringAsList = new ArrayList<>();
stringAsList.add("bar");
JavaSparkContext sparkContext = new JavaSparkContext(spark.sparkContext());
JavaRDD<Row> rowRDD = sparkContext.parallelize(stringAsList).map((String row) -> RowFactory.create(row));
// Creates schema
StructType schema = DataTypes.createStructType(
new StructField[] { DataTypes.createStructField("foe", DataTypes.StringType, false) });
Dataset<Row> df = spark.sqlContext().createDataFrame(rowRDD, schema).toDF();
2x2 数据:
SparkSession spark = SparkSession.builder().appName("Build a DataFrame from Scratch").master("local[*]")
.getOrCreate();
List<String[]> stringAsList = new ArrayList<>();
stringAsList.add(new String[] { "bar1.1", "bar2.1" });
stringAsList.add(new String[] { "bar1.2", "bar2.2" });
JavaSparkContext sparkContext = new JavaSparkContext(spark.sparkContext());
JavaRDD<Row> rowRDD = sparkContext.parallelize(stringAsList).map((String[] row) -> RowFactory.create(row));
// Creates schema
StructType schema = DataTypes
.createStructType(new StructField[] { DataTypes.createStructField("foe1", DataTypes.StringType, false),
DataTypes.createStructField("foe2", DataTypes.StringType, false) });
Dataset<Row> df = spark.sqlContext().createDataFrame(rowRDD, schema).toDF();
代码可以从以下网址下载:https://github.com/jgperrin/net.jgp.labs.spark。
基于@jgp 建议的内容。如果您想为 混合类型 执行此操作,您可以执行以下操作:
List<Tuple2<Integer, Boolean>> mixedTypes = Arrays.asList(
new Tuple2<>(1, false),
new Tuple2<>(1, false),
new Tuple2<>(1, false));
JavaRDD<Row> rowRDD = sparkContext.parallelize(mixedTypes).map(row -> RowFactory.create(row._1, row._2));
StructType mySchema = new StructType()
.add("id", DataTypes.IntegerType, false)
.add("flag", DataTypes.BooleanType, false);
Dataset<Row> df = spark.sqlContext().createDataFrame(rowRDD, mySchema).toDF();
这可能有助于解决 @jdk2588 的问题。
这里post提供了一个不经过sparkContext.parallelize(...)
的解决方案:https://timepasstechies.com/create-spark-dataframe-java-list/
在 Scala 中,我可以从内存中的字符串创建单行 DataFrame,如下所示:
val stringAsList = List("buzz")
val df = sqlContext.sparkContext.parallelize(jsonValues).toDF("fizz")
df.show()
当df.show()
运行时,输出:
+-----+
| fizz|
+-----+
| buzz|
+-----+
现在我正在尝试从 Java class. 内部执行此操作,显然 JavaRDD
没有 toDF(String)
方法。我试过:
List<String> stringAsList = new ArrayList<String>();
stringAsList.add("buzz");
SQLContext sqlContext = new SQLContext(sparkContext);
DataFrame df = sqlContext.createDataFrame(sparkContext
.parallelize(stringAsList), StringType);
df.show();
...但似乎仍然不足。现在当 df.show();
执行时,我得到:
++
||
++
||
++
(一个空的 DF。)所以我问:使用 Java API,如何将内存中的字符串读入 DataFrame其中只有 1 行和 1 列,并指定该列的名称?(以便 df.show()
与上面的 Scala 相同)?
您可以通过为 Rdd 创建 List 而不是创建将包含列名称的 Schema 来实现此目的。
可能还有其他方法,这只是其中一种。
List<String> stringAsList = new ArrayList<String>();
stringAsList.add("buzz");
JavaRDD<Row> rowRDD = sparkContext.parallelize(stringAsList).map((String row) -> {
return RowFactory.create(row);
});
StructType schema = DataTypes.createStructType(new StructField[] { DataTypes.createStructField("fizz", DataTypes.StringType, false) });
DataFrame df = sqlContext.createDataFrame(rowRDD, schema).toDF();
df.show();
//+----+
|fizz|
+----+
|buzz|
如果您需要升级,我已经为 Spark 2 创建了 2 个示例:
简单 Fizz/Buzz(或 foe/bar - 老一代 :)):
SparkSession spark = SparkSession.builder().appName("Build a DataFrame from Scratch").master("local[*]")
.getOrCreate();
List<String> stringAsList = new ArrayList<>();
stringAsList.add("bar");
JavaSparkContext sparkContext = new JavaSparkContext(spark.sparkContext());
JavaRDD<Row> rowRDD = sparkContext.parallelize(stringAsList).map((String row) -> RowFactory.create(row));
// Creates schema
StructType schema = DataTypes.createStructType(
new StructField[] { DataTypes.createStructField("foe", DataTypes.StringType, false) });
Dataset<Row> df = spark.sqlContext().createDataFrame(rowRDD, schema).toDF();
2x2 数据:
SparkSession spark = SparkSession.builder().appName("Build a DataFrame from Scratch").master("local[*]")
.getOrCreate();
List<String[]> stringAsList = new ArrayList<>();
stringAsList.add(new String[] { "bar1.1", "bar2.1" });
stringAsList.add(new String[] { "bar1.2", "bar2.2" });
JavaSparkContext sparkContext = new JavaSparkContext(spark.sparkContext());
JavaRDD<Row> rowRDD = sparkContext.parallelize(stringAsList).map((String[] row) -> RowFactory.create(row));
// Creates schema
StructType schema = DataTypes
.createStructType(new StructField[] { DataTypes.createStructField("foe1", DataTypes.StringType, false),
DataTypes.createStructField("foe2", DataTypes.StringType, false) });
Dataset<Row> df = spark.sqlContext().createDataFrame(rowRDD, schema).toDF();
代码可以从以下网址下载:https://github.com/jgperrin/net.jgp.labs.spark。
基于@jgp 建议的内容。如果您想为 混合类型 执行此操作,您可以执行以下操作:
List<Tuple2<Integer, Boolean>> mixedTypes = Arrays.asList(
new Tuple2<>(1, false),
new Tuple2<>(1, false),
new Tuple2<>(1, false));
JavaRDD<Row> rowRDD = sparkContext.parallelize(mixedTypes).map(row -> RowFactory.create(row._1, row._2));
StructType mySchema = new StructType()
.add("id", DataTypes.IntegerType, false)
.add("flag", DataTypes.BooleanType, false);
Dataset<Row> df = spark.sqlContext().createDataFrame(rowRDD, mySchema).toDF();
这可能有助于解决 @jdk2588 的问题。
这里post提供了一个不经过sparkContext.parallelize(...)
的解决方案:https://timepasstechies.com/create-spark-dataframe-java-list/