值 reduceByKey 不是 org.apache.spark.rdd.RDD 的成员
value reduceByKey is not a member of org.apache.spark.rdd.RDD
非常sad.Myspark版本是2.1.1,Scala版本是2.11
import org.apache.spark.SparkContext._
import com.mufu.wcsa.component.dimension.{DimensionKey, KeyTrait}
import com.mufu.wcsa.log.LogRecord
import org.apache.spark.rdd.RDD
object PV {
//
def stat[C <: LogRecord,K <:DimensionKey](statTrait: KeyTrait[C ,K],logRecords: RDD[C]): RDD[(K,Int)] = {
val t = logRecords.map(record =>(statTrait.getKey(record),1)).reduceByKey((x,y) => x + y)
我遇到了这个错误
at 1502387780429
[ERROR] /Users/lemanli/work/project/newcma/wcsa/wcsa_my/wcsavistor/src/main/scala/com/mufu/wcsa/component/stat/PV.scala:25: error: value reduceByKey is not a member of org.apache.spark.rdd.RDD[(K, Int)]
[ERROR] val t = logRecords.map(record =>(statTrait.getKey(record),1)).reduceByKey((x,y) => x + y)
定义了一个特征
trait KeyTrait[C <: LogRecord,K <: DimensionKey]{
def getKey(c:C):K
}
It is compiled,Thanks.
def stat[C <: LogRecord,K <:DimensionKey : ClassTag : Ordering](statTrait: KeyTrait[C ,K],logRecords: RDD[C]): RDD[(K,Int)] = {
val t = logRecords.map(record =>(statTrait.getKey(record),1)).reduceByKey((x,y) => x + y)
Key 需要覆盖 Ordering[T]。
object ClientStat extends KeyTrait[DetailLogRecord, ClientStat] {
implicit val c
lientStatSorting = new Ordering[ClientStat] {
override def compare(x: ClientStat, y: ClientStat): Int = x.key.compare(y.key)
}
def getKey(detailLogRecord: DetailLogRecord): ClientStat = new ClientStat(detailLogRecord)
}
reduceByKey
是一种只在元组的RDD上定义的方法,即RDD[(K, V)]
(K,V只是约定俗成的说first是key,second是value)。
从示例中不确定您要实现的目标,但可以肯定的是,您需要将 RDD 中的值转换为两个值的元组。
这来自于一般地使用 pair rdd 函数。 reduceByKey
方法实际上是PairRDDFunctions
class的一个方法,从RDD
有一个隐式转换:
implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V]
所以它需要几个隐式类型class。通常在使用简单的具体类型时,这些类型已经在范围内。但是您应该能够修改您的方法以也需要那些相同的隐式:
def stat[C <: LogRecord,K <:DimensionKey](statTrait: KeyTrait[C ,K],logRecords: RDD[C])(implicit kt: ClassTag[K], ord: Ordering[K])
或者使用更新的语法:
def stat[C <: LogRecord,K <:DimensionKey : ClassTag : Ordering](statTrait: KeyTrait[C ,K],logRecords: RDD[C])
非常sad.Myspark版本是2.1.1,Scala版本是2.11
import org.apache.spark.SparkContext._
import com.mufu.wcsa.component.dimension.{DimensionKey, KeyTrait}
import com.mufu.wcsa.log.LogRecord
import org.apache.spark.rdd.RDD
object PV {
//
def stat[C <: LogRecord,K <:DimensionKey](statTrait: KeyTrait[C ,K],logRecords: RDD[C]): RDD[(K,Int)] = {
val t = logRecords.map(record =>(statTrait.getKey(record),1)).reduceByKey((x,y) => x + y)
我遇到了这个错误
at 1502387780429
[ERROR] /Users/lemanli/work/project/newcma/wcsa/wcsa_my/wcsavistor/src/main/scala/com/mufu/wcsa/component/stat/PV.scala:25: error: value reduceByKey is not a member of org.apache.spark.rdd.RDD[(K, Int)]
[ERROR] val t = logRecords.map(record =>(statTrait.getKey(record),1)).reduceByKey((x,y) => x + y)
定义了一个特征
trait KeyTrait[C <: LogRecord,K <: DimensionKey]{
def getKey(c:C):K
}
It is compiled,Thanks.
def stat[C <: LogRecord,K <:DimensionKey : ClassTag : Ordering](statTrait: KeyTrait[C ,K],logRecords: RDD[C]): RDD[(K,Int)] = {
val t = logRecords.map(record =>(statTrait.getKey(record),1)).reduceByKey((x,y) => x + y)
Key 需要覆盖 Ordering[T]。
object ClientStat extends KeyTrait[DetailLogRecord, ClientStat] {
implicit val c
lientStatSorting = new Ordering[ClientStat] {
override def compare(x: ClientStat, y: ClientStat): Int = x.key.compare(y.key)
}
def getKey(detailLogRecord: DetailLogRecord): ClientStat = new ClientStat(detailLogRecord)
}
reduceByKey
是一种只在元组的RDD上定义的方法,即RDD[(K, V)]
(K,V只是约定俗成的说first是key,second是value)。
从示例中不确定您要实现的目标,但可以肯定的是,您需要将 RDD 中的值转换为两个值的元组。
这来自于一般地使用 pair rdd 函数。 reduceByKey
方法实际上是PairRDDFunctions
class的一个方法,从RDD
有一个隐式转换:
implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V]
所以它需要几个隐式类型class。通常在使用简单的具体类型时,这些类型已经在范围内。但是您应该能够修改您的方法以也需要那些相同的隐式:
def stat[C <: LogRecord,K <:DimensionKey](statTrait: KeyTrait[C ,K],logRecords: RDD[C])(implicit kt: ClassTag[K], ord: Ordering[K])
或者使用更新的语法:
def stat[C <: LogRecord,K <:DimensionKey : ClassTag : Ordering](statTrait: KeyTrait[C ,K],logRecords: RDD[C])