DataFrame 到 RDD[(String, String)] 的转换
DataFrame to RDD[(String, String)] conversion
我想在 Databricks 中将 org.apache.spark.sql.DataFrame
转换为 org.apache.spark.rdd.RDD[(String, String)]
。 谁能帮忙?
背景(也欢迎更好的解决方案):我有一个 Kafka 流,它(经过一些步骤后)变成了一个 2 列数据框。我想将其放入 Redis 缓存中,第一列作为键,第二列作为值。
更具体地说 输入的类型是这样的:lastContacts: org.apache.spark.sql.DataFrame = [serialNumber: string, lastModified: bigint]
。我尝试如下放入Redis:
sc.toRedisKV(lastContacts)(redisConfig)
错误消息如下所示:
notebook:20: error: type mismatch;
found : org.apache.spark.sql.DataFrame
(which expands to) org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
required: org.apache.spark.rdd.RDD[(String, String)]
sc.toRedisKV(lastContacts)(redisConfig)
我已经尝试过一些想法(比如函数 .rdd
),但 none 有所帮助。
如果要将行映射到不同的 RDD 元素,可以使用 df.map(row => ...) 将数据帧转换为 RDD。
例如:
val df = Seq(("table1",432),
("table2",567),
("table3",987),
("table1",789)).
toDF("tablename", "Code").toDF()
df.show()
+---------+----+
|tablename|Code|
+---------+----+
| table1| 432|
| table2| 567|
| table3| 987|
| table1| 789|
+---------+----+
val rddDf = df.map(r => (r(0), r(1))).rdd // Type:RDD[(Any,Any)]
OR
val rdd = df.map(r => (r(0).toString, r(1).toString)).rdd //Type: RDD[(String,String)]
请参阅 https://community.hortonworks.com/questions/106500/error-in-spark-streaming-kafka-integration-structu.html 关于 AnalysisException:必须使用 writeStream.start()
执行带有流源的查询
您需要等待使用查询终止查询。awaitTermination()
防止进程在查询处于活动状态时退出。
我想在 Databricks 中将 org.apache.spark.sql.DataFrame
转换为 org.apache.spark.rdd.RDD[(String, String)]
。 谁能帮忙?
背景(也欢迎更好的解决方案):我有一个 Kafka 流,它(经过一些步骤后)变成了一个 2 列数据框。我想将其放入 Redis 缓存中,第一列作为键,第二列作为值。
更具体地说 输入的类型是这样的:lastContacts: org.apache.spark.sql.DataFrame = [serialNumber: string, lastModified: bigint]
。我尝试如下放入Redis:
sc.toRedisKV(lastContacts)(redisConfig)
错误消息如下所示:
notebook:20: error: type mismatch;
found : org.apache.spark.sql.DataFrame
(which expands to) org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
required: org.apache.spark.rdd.RDD[(String, String)]
sc.toRedisKV(lastContacts)(redisConfig)
我已经尝试过一些想法(比如函数 .rdd
),但 none 有所帮助。
如果要将行映射到不同的 RDD 元素,可以使用 df.map(row => ...) 将数据帧转换为 RDD。
例如:
val df = Seq(("table1",432),
("table2",567),
("table3",987),
("table1",789)).
toDF("tablename", "Code").toDF()
df.show()
+---------+----+
|tablename|Code|
+---------+----+
| table1| 432|
| table2| 567|
| table3| 987|
| table1| 789|
+---------+----+
val rddDf = df.map(r => (r(0), r(1))).rdd // Type:RDD[(Any,Any)]
OR
val rdd = df.map(r => (r(0).toString, r(1).toString)).rdd //Type: RDD[(String,String)]
请参阅 https://community.hortonworks.com/questions/106500/error-in-spark-streaming-kafka-integration-structu.html 关于 AnalysisException:必须使用 writeStream.start()
执行带有流源的查询您需要等待使用查询终止查询。awaitTermination() 防止进程在查询处于活动状态时退出。