Spark Couchbase 连接器 - N1QL RDD 到 DataFrame
Spark Couchbase Connector - N1QL RDD to DataFrame
我正在尝试将 RDD 表单 couchbase 转换为 DataFrame(scala 2.11 - 和 spark 2.1)但出现过载错误,我的代码在下面,有什么想法吗?另一个线程没有完全回答这个问题。
我在 Databricks 笔记本中执行此操作,并且我将 couch 连接器用于纯 DataFrames 很好,但是如果我想执行客户 N1QL 查询,更定制一些,这是我能想到的最好的方法,首先使用 RDD?
首先,有没有更好的方法在原生 Dataframe 中执行此查询?我想我需要使用 n1qL 和 RDD 还是我在这里遗漏了什么?
请告诉我我在下面的 RDD 转换代码中做错了什么,我也得到了 :84: 错误:重载方法值 createDataFrame 与替代:错误....谢谢!
val reconciliationSchema =
new StructType()
.add("numEvents", IntegerType)
.add("eventCategory", StringType)
.add("eventName", StringType)
val orderEventsCouchbaseQuery = """
SELECT
count(*) as numEvents, event.eventCategory, event.eventName
FROM
events
WHERE
STR_TO_UTC(event.eventOccurredTime)
BETWEEN STR_TO_UTC("2017-06-16") AND STR_TO_UTC("2017-06-26")
GROUP BY event.eventCategory, event.eventName
order by event.eventCategory, event.eventName
"""
val queryResultRDD = sc.couchbaseQuery(N1qlQuery.simple(orderEventsCouchbaseQuery),"events").map(_.value)
val queryResultDF: DataFrame = spark.createDataFrame(queryResultRDD,reconciliationSchema)
display(queryResultDF)
我认为您 运行 遇到的问题与其说是与 couchbase 相关的问题,不如说是 spark/scala 类型推断问题。当您使用 createDataFrame
时,在这种情况下,spark 需要使用 Row
而不是 return 类型的 couchbase 查询用于该 rdd。
所以这里有一些类似的示例代码,您可以看到当变成一行时它工作正常:
val query = N1qlQuery.simple("" +
"select country, count(*) as count " +
"from `travel-sample` " +
"where type = 'airport' " +
"group by country " +
"order by count desc")
val schema = StructType(
StructField("count", IntegerType) ::
StructField("country", StringType) :: Nil
)
val rdd = spark.sparkContext.couchbaseQuery(query).map(r => Row(r.value.getInt("count"), r.value.getString("country")))
spark.createDataFrame(rdd, schema).show()
我正在尝试将 RDD 表单 couchbase 转换为 DataFrame(scala 2.11 - 和 spark 2.1)但出现过载错误,我的代码在下面,有什么想法吗?另一个线程没有完全回答这个问题。
我在 Databricks 笔记本中执行此操作,并且我将 couch 连接器用于纯 DataFrames 很好,但是如果我想执行客户 N1QL 查询,更定制一些,这是我能想到的最好的方法,首先使用 RDD?
首先,有没有更好的方法在原生 Dataframe 中执行此查询?我想我需要使用 n1qL 和 RDD 还是我在这里遗漏了什么?
请告诉我我在下面的 RDD 转换代码中做错了什么,我也得到了 :84: 错误:重载方法值 createDataFrame 与替代:错误....谢谢!
val reconciliationSchema =
new StructType()
.add("numEvents", IntegerType)
.add("eventCategory", StringType)
.add("eventName", StringType)
val orderEventsCouchbaseQuery = """
SELECT
count(*) as numEvents, event.eventCategory, event.eventName
FROM
events
WHERE
STR_TO_UTC(event.eventOccurredTime)
BETWEEN STR_TO_UTC("2017-06-16") AND STR_TO_UTC("2017-06-26")
GROUP BY event.eventCategory, event.eventName
order by event.eventCategory, event.eventName
"""
val queryResultRDD = sc.couchbaseQuery(N1qlQuery.simple(orderEventsCouchbaseQuery),"events").map(_.value)
val queryResultDF: DataFrame = spark.createDataFrame(queryResultRDD,reconciliationSchema)
display(queryResultDF)
我认为您 运行 遇到的问题与其说是与 couchbase 相关的问题,不如说是 spark/scala 类型推断问题。当您使用 createDataFrame
时,在这种情况下,spark 需要使用 Row
而不是 return 类型的 couchbase 查询用于该 rdd。
所以这里有一些类似的示例代码,您可以看到当变成一行时它工作正常:
val query = N1qlQuery.simple("" +
"select country, count(*) as count " +
"from `travel-sample` " +
"where type = 'airport' " +
"group by country " +
"order by count desc")
val schema = StructType(
StructField("count", IntegerType) ::
StructField("country", StringType) :: Nil
)
val rdd = spark.sparkContext.couchbaseQuery(query).map(r => Row(r.value.getInt("count"), r.value.getString("country")))
spark.createDataFrame(rdd, schema).show()