如何使用 Java 在 Apache spark 中将一行数组平面映射为多行?
How do I flatMap a row of arrays into multiple rows in Apache spark using Java?
我有一个 json 数据文件,其中包含一个 属性,它是 "tags" 的字符串数组。 Apache Spark DataFrame 架构如下所示:
root
|-- acceptedAnswerId: long (nullable = true)
|-- answerCount: long (nullable = true)
|-- body: string (nullable = true)
|-- score: long (nullable = true)
|-- <b>tags</b>: array (nullable = true)
| |-- element: string (containsNull = true)
|-- title: string (nullable = true)
|-- viewCount: long (nullable = true)
我想在 Java 中将每一行分解成几行。我可以使用 找到类似的答案,但无法在 Java 中转换解决方案。有什么建议吗?
JSON 中的 "tags" 属性 看起来像:
"tags":["c#",".net","compression","decompression"]
您可以简单地使用 explode
功能。
DataFrame df = ...
DataFrame expanded = df.withColumn(
"tag", org.apache.spark.sql.functions.explode(df.col("tags"))).drop("tags");
为了使解决方案更直观,示例 json 数据如下所示:
{"id":4,"score":358,"viewCount":24247,"answerCount":13,"commentCount":1,"favoriteCount":28,"tags":["c#","winforms","type-conversion","opacity"]}
下面是将 json 数据读取为“DataFrame”对象的 Java 代码片段:
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
String jsonData = "{\"id\":4,\"score\":358,\"viewCount\":24247,\"tags\":[\"c#\",\"winforms\",\"type-conversion\",\"opacity\"]}";
List dataSet = Arrays.asList(jsonData);
JavaRDD distData = sc.parallelize(dataSet);
DataFrame Whosebug_Posts = sqlContext.read().json(distData);
Whosebug_Posts.printSchema(); //let's print out the DataFrame schema (Output#1)
Whosebug_Posts.show(); //let's show the DataFrame content (Ouput#2)
架构:Output#1 如下所示:
root
|-- id: long (nullable = true)
|-- score: long (nullable = true)
|-- tags: array (nullable = true)
| |-- element: string (containsNull = true)
|-- viewCount: long (nullable = true)
数据:Output#2 如下所示:
+---+-----+--------------------+---------+
| id|score| tags|viewCount|
+---+-----+--------------------+---------+
| 4| 358|[c#, winforms, ty...| 24247|
+---+-----+--------------------+---------+
根据zero323的信息,我继续处理:
DataFrame expanded = Whosebug_Posts.withColumn("tag", org.apache.spark.sql.functions.explode(Whosebug_Posts.col("tags")));
expanded.printSchema(); //let's print out the DataFrame schema again (Output#3)
expanded.show(); //let's show the DataFrame content (Output#4)
架构:Output#3 如下所示:
root
|-- id: long (nullable = true)
|-- score: long (nullable = true)
|-- tags: array (nullable = true)
| |-- element: string (containsNull = true)
|-- viewCount: long (nullable = true)
|-- tag: string (nullable = true)
以及爆炸后的数据:Output#4
+---+-----+--------------------+---------+---------------+
| id|score| tags|viewCount| tag|
+---+-----+--------------------+---------+---------------+
| 4| 358|[c#, winforms, ty...| 24247| c#|
| 4| 358|[c#, winforms, ty...| 24247| winforms|
| 4| 358|[c#, winforms, ty...| 24247|type-conversion|
| 4| 358|[c#, winforms, ty...| 24247| opacity|
+---+-----+--------------------+---------+---------------+
结果看起来与使用 SQL 连接两个 table 非常相似。
我有一个 json 数据文件,其中包含一个 属性,它是 "tags" 的字符串数组。 Apache Spark DataFrame 架构如下所示:
root
|-- acceptedAnswerId: long (nullable = true)
|-- answerCount: long (nullable = true)
|-- body: string (nullable = true)
|-- score: long (nullable = true)
|-- <b>tags</b>: array (nullable = true)
| |-- element: string (containsNull = true)
|-- title: string (nullable = true)
|-- viewCount: long (nullable = true)
我想在 Java 中将每一行分解成几行。我可以使用
JSON 中的 "tags" 属性 看起来像:
"tags":["c#",".net","compression","decompression"]
您可以简单地使用 explode
功能。
DataFrame df = ...
DataFrame expanded = df.withColumn(
"tag", org.apache.spark.sql.functions.explode(df.col("tags"))).drop("tags");
为了使解决方案更直观,示例 json 数据如下所示:
{"id":4,"score":358,"viewCount":24247,"answerCount":13,"commentCount":1,"favoriteCount":28,"tags":["c#","winforms","type-conversion","opacity"]}
下面是将 json 数据读取为“DataFrame”对象的 Java 代码片段:
JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); String jsonData = "{\"id\":4,\"score\":358,\"viewCount\":24247,\"tags\":[\"c#\",\"winforms\",\"type-conversion\",\"opacity\"]}"; List dataSet = Arrays.asList(jsonData); JavaRDD distData = sc.parallelize(dataSet); DataFrame Whosebug_Posts = sqlContext.read().json(distData); Whosebug_Posts.printSchema(); //let's print out the DataFrame schema (Output#1) Whosebug_Posts.show(); //let's show the DataFrame content (Ouput#2)
架构:Output#1 如下所示:
root |-- id: long (nullable = true) |-- score: long (nullable = true) |-- tags: array (nullable = true) | |-- element: string (containsNull = true) |-- viewCount: long (nullable = true)
数据:Output#2 如下所示:
+---+-----+--------------------+---------+ | id|score| tags|viewCount| +---+-----+--------------------+---------+ | 4| 358|[c#, winforms, ty...| 24247| +---+-----+--------------------+---------+
根据zero323的信息,我继续处理:
DataFrame expanded = Whosebug_Posts.withColumn("tag", org.apache.spark.sql.functions.explode(Whosebug_Posts.col("tags"))); expanded.printSchema(); //let's print out the DataFrame schema again (Output#3) expanded.show(); //let's show the DataFrame content (Output#4)
架构:Output#3 如下所示:
root |-- id: long (nullable = true) |-- score: long (nullable = true) |-- tags: array (nullable = true) | |-- element: string (containsNull = true) |-- viewCount: long (nullable = true) |-- tag: string (nullable = true)
以及爆炸后的数据:Output#4
+---+-----+--------------------+---------+---------------+ | id|score| tags|viewCount| tag| +---+-----+--------------------+---------+---------------+ | 4| 358|[c#, winforms, ty...| 24247| c#| | 4| 358|[c#, winforms, ty...| 24247| winforms| | 4| 358|[c#, winforms, ty...| 24247|type-conversion| | 4| 358|[c#, winforms, ty...| 24247| opacity| +---+-----+--------------------+---------+---------------+
结果看起来与使用 SQL 连接两个 table 非常相似。