Spark 中的平均字长
Average word length in Spark
我有一个值列表及其所有出现的总长度作为一个数组。
例如:如果我的句子是
"I have a cat. The cat looks very cute"
我的阵列看起来像
Array((I,1), (have,4), (a,1), (cat,6), (The, 3), (looks, 5), (very ,4), (cute,4))
现在我想计算每个单词的平均长度。即出现的长度/次数。
我尝试使用 Scala 进行如下编码:
val avglen = arr.reduceByKey( (x,y) => (x, y.toDouble / x.size.toDouble) )
我在 x.size
收到如下错误
error: value size is not a member of Int
请帮我看看哪里出错了。
如果我理解正确:
val rdd: RDD[(String, Int) = ???
val ave: RDD[(String, Double) =
rdd.map { case (name, numOccurance) =>
(name, name.length.toDouble / numOccurance)
}
这是一个有点令人困惑的问题。如果您的数据已经在 Array[(String, Int)]
集合中(大概在 collect()
到驱动程序之后),那么您不需要使用任何 RDD
转换。事实上,您可以使用 运行 和 fold*()
的绝妙技巧来获取集合的平均值:
val average = arr.foldLeft(0.0) { case (sum: Double, (_, count: Int)) => sum + count } / arr.foldLeft(0.0) { case (sum: Double, (word: String, count: Int)) => sum + count / word.length }
有点啰嗦,但它基本上汇总了分子中的字符总数和分母中的单词数。 运行 在您的示例中,我看到以下内容:
scala> val arr = Array(("I",1), ("have",4), ("a",1), ("cat",6), ("The", 3), ("looks", 5), ("very" ,4), ("cute",4))
arr: Array[(String, Int)] = Array((I,1), (have,4), (a,1), (cat,6), (The,3), (looks,5), (very,4), (cute,4))
scala> val average = ...
average: Double = 3.111111111111111
如果您的 (String, Int)
元组分布在 RDD[(String, Int)]
中,您可以使用 accumulators 轻松解决此问题:
val chars = sc.accumulator(0.0)
val words = sc.accumulator(0.0)
wordsRDD.foreach { case (word: String, count: Int) =>
chars += count; words += count / word.length
}
val average = chars.value / words.value
当 运行ning 在上面的例子中(放在一个 RDD
中)时,我看到以下内容:
scala> val arr = Array(("I",1), ("have",4), ("a",1), ("cat",6), ("The", 3), ("looks", 5), ("very" ,4), ("cute",4))
arr: Array[(String, Int)] = Array((I,1), (have,4), (a,1), (cat,6), (The,3), (looks,5), (very,4), (cute,4))
scala> val wordsRDD = sc.parallelize(arr)
wordsRDD: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:14
scala> val chars = sc.accumulator(0.0)
chars: org.apache.spark.Accumulator[Double] = 0.0
scala> val words = sc.accumulator(0.0)
words: org.apache.spark.Accumulator[Double] = 0.0
scala> wordsRDD.foreach { case (word: String, count: Int) =>
| chars += count; words += count / word.length
| }
...
scala> val average = chars.value / words.value
average: Double = 3.111111111111111
看了你的评论,我想我明白了:
val words = sc.parallelize(Array(("i", 1), ("have", 4),
("a", 1), ("cat", 6),
("the", 3), ("looks", 5),
("very", 4), ("cute", 4)))
val avgs = words.map { case (word, count) => (word, count / word.length.toDouble) }
println("My averages are: ")
avgs.take(100).foreach(println)
假设您有一个包含这些词的段落,并且您想计算该段落中词的平均大小。
分两步,使用 map-reduce
方法和 spark-1.5.1
:
val words = sc.parallelize(Array(("i", 1), ("have", 4),
("a", 1), ("cat", 6),
("the", 3), ("looks", 5),
("very", 4), ("cute", 4)))
val wordCount = words.map { case (word, count) => count}.reduce((a, b) => a + b)
val wordLength = words.map { case (word, count) => word.length * count}.reduce((a, b) => a + b)
println("The avg length is: " + wordLength / wordCount.toDouble)
我 运行 此代码使用连接到 spark-kernel
的 .ipynb,这是输出。
我有一个值列表及其所有出现的总长度作为一个数组。
例如:如果我的句子是
"I have a cat. The cat looks very cute"
我的阵列看起来像
Array((I,1), (have,4), (a,1), (cat,6), (The, 3), (looks, 5), (very ,4), (cute,4))
现在我想计算每个单词的平均长度。即出现的长度/次数。
我尝试使用 Scala 进行如下编码:
val avglen = arr.reduceByKey( (x,y) => (x, y.toDouble / x.size.toDouble) )
我在 x.size
error: value size is not a member of Int
请帮我看看哪里出错了。
如果我理解正确:
val rdd: RDD[(String, Int) = ???
val ave: RDD[(String, Double) =
rdd.map { case (name, numOccurance) =>
(name, name.length.toDouble / numOccurance)
}
这是一个有点令人困惑的问题。如果您的数据已经在 Array[(String, Int)]
集合中(大概在 collect()
到驱动程序之后),那么您不需要使用任何 RDD
转换。事实上,您可以使用 运行 和 fold*()
的绝妙技巧来获取集合的平均值:
val average = arr.foldLeft(0.0) { case (sum: Double, (_, count: Int)) => sum + count } / arr.foldLeft(0.0) { case (sum: Double, (word: String, count: Int)) => sum + count / word.length }
有点啰嗦,但它基本上汇总了分子中的字符总数和分母中的单词数。 运行 在您的示例中,我看到以下内容:
scala> val arr = Array(("I",1), ("have",4), ("a",1), ("cat",6), ("The", 3), ("looks", 5), ("very" ,4), ("cute",4))
arr: Array[(String, Int)] = Array((I,1), (have,4), (a,1), (cat,6), (The,3), (looks,5), (very,4), (cute,4))
scala> val average = ...
average: Double = 3.111111111111111
如果您的 (String, Int)
元组分布在 RDD[(String, Int)]
中,您可以使用 accumulators 轻松解决此问题:
val chars = sc.accumulator(0.0)
val words = sc.accumulator(0.0)
wordsRDD.foreach { case (word: String, count: Int) =>
chars += count; words += count / word.length
}
val average = chars.value / words.value
当 运行ning 在上面的例子中(放在一个 RDD
中)时,我看到以下内容:
scala> val arr = Array(("I",1), ("have",4), ("a",1), ("cat",6), ("The", 3), ("looks", 5), ("very" ,4), ("cute",4))
arr: Array[(String, Int)] = Array((I,1), (have,4), (a,1), (cat,6), (The,3), (looks,5), (very,4), (cute,4))
scala> val wordsRDD = sc.parallelize(arr)
wordsRDD: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:14
scala> val chars = sc.accumulator(0.0)
chars: org.apache.spark.Accumulator[Double] = 0.0
scala> val words = sc.accumulator(0.0)
words: org.apache.spark.Accumulator[Double] = 0.0
scala> wordsRDD.foreach { case (word: String, count: Int) =>
| chars += count; words += count / word.length
| }
...
scala> val average = chars.value / words.value
average: Double = 3.111111111111111
看了你的评论,我想我明白了:
val words = sc.parallelize(Array(("i", 1), ("have", 4),
("a", 1), ("cat", 6),
("the", 3), ("looks", 5),
("very", 4), ("cute", 4)))
val avgs = words.map { case (word, count) => (word, count / word.length.toDouble) }
println("My averages are: ")
avgs.take(100).foreach(println)
假设您有一个包含这些词的段落,并且您想计算该段落中词的平均大小。
分两步,使用 map-reduce
方法和 spark-1.5.1
:
val words = sc.parallelize(Array(("i", 1), ("have", 4),
("a", 1), ("cat", 6),
("the", 3), ("looks", 5),
("very", 4), ("cute", 4)))
val wordCount = words.map { case (word, count) => count}.reduce((a, b) => a + b)
val wordLength = words.map { case (word, count) => word.length * count}.reduce((a, b) => a + b)
println("The avg length is: " + wordLength / wordCount.toDouble)
我 运行 此代码使用连接到 spark-kernel
的 .ipynb,这是输出。