`map` 和 `reduce` 方法如何在 Spark RDD 中工作?

How do `map` and `reduce` methods work in Spark RDDs?

以下代码来自Apache Spark快速入门指南。 有人可以解释一下 "line" 变量是什么以及它来自哪里吗?

textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)

此外,值如何传递到 a、b?

Link 到 QSG http://spark.apache.org/docs/latest/quick-start.html

Mapreduce是RDDclass的方法,其接口类似于scala集合。

你传递给方法mapreduce的实际上是匿名函数(在map中有一个参数,在reduce中有两个参数)。 textFile 为它具有的每个元素(在此上下文中的文本行)调用提供的函数。

也许你应该先阅读一些 scala 集合介绍。

您可以在此处阅读有关 RDD class API 的更多信息: https://spark.apache.org/docs/1.2.1/api/scala/#org.apache.spark.rdd.RDD

首先,根据你的link,textfile被创建为

val textFile = sc.textFile("README.md")

这样 textfile 是一个 RDD[String],这意味着它是一个 String 类型的弹性分布式数据集。 API 访问与常规 Scala 集合非常相似。

现在这个 map 有什么作用?

假设您有一个 String 列表,并希望将其转换为一个 Int 列表,代表每个字符串的长度。

val stringlist: List[String] = List("ab", "cde", "f")
val intlist: List[Int] = stringlist.map( x => x.length )

map 方法需要一个函数。一个函数,来自 String => Int。使用该函数,列表中的每个元素都会被转换。所以intlist的值为List( 2, 3, 1 )

在这里,我们从 String => Int 创建了一个匿名函数。即x => x.length。甚至可以将函数写得更明确,如

stringlist.map( (x: String) => x.length )  

如果你确实使用上面的显式写法,你可以

val stringLength : (String => Int) = {
  x => x.length
}
val intlist = stringlist.map( stringLength )

所以,这里很明显,stringLength 是一个从 StringInt 的函数。

备注:一般来说,map就是所谓的Functor。当您从 A => B 提供函数时,仿函数(此处为 List)的 map 允许您使用该函数也从 List[A] => List[B] 开始。这叫提升。

问题的答案

What is the "line" variable?

上面说到line是函数的入参line => line.split(" ").size

更明确 (line: String) => line.split(" ").size

例子:如果line是"hello world",函数returns 2.

"hello world" 
=> Array("hello", "world")  // split 
=> 2                        // size of Array

How does a value get passed into a,b?

reduce 还需要一个来自 (A, A) => A 的函数,其中 A 是您的 RDD 的类型。让我们调用这个函数 op.

什么reduce。示例:

List( 1, 2, 3, 4 ).reduce( (x,y) => x + y )
Step 1 : op( 1, 2 ) will be the first evaluation. 
  Start with 1, 2, that is 
    x is 1  and  y is 2
Step 2:  op( op( 1, 2 ), 3 ) - take the next element 3
  Take the next element 3: 
    x is op(1,2) = 3   and y = 3
Step 3:  op( op( op( 1, 2 ), 3 ), 4) 
  Take the next element 4: 
    x is op(op(1,2), 3 ) = op( 3,3 ) = 6    and y is 4

这里的结果是列表元素的总和,10。

备注: 一般reduce计算

op( op( ... op(x_1, x_2) ..., x_{n-1}), x_n)

完整示例

首先,textfile是一个RDD[String],比如说

TextFile
 "hello Tyth"
 "cool example, eh?"
 "goodbye"

TextFile.map(line => line.split(" ").size)
 2
 3
 1
TextFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
 3
   Steps here, recall `(a, b) => if (a > b) a else b)`
   - op( op(2, 3), 1) evaluates to op(3, 1), since op(2, 3) = 3 
   - op( 3, 1 ) = 3

map 函数的作用是,它获取参数列表并将其映射到某个函数。类似于 python 中的 map 函数,如果你熟悉的话。

此外,文件就像一个字符串列表。 (不完全是,但这就是迭代的方式)

我们假设这是您的文件。

val list_a: List[String] = List("first line", "second line", "last line")

现在让我们看看map函数是如何工作的。

我们需要两件事,list of values 我们已经有了,function 我们想映射这个值。让我们考虑真正简单的功能来理解。

val myprint = (arg:String)=>println(arg)

此函数只接受单个字符串参数并在控制台上打印。

myprint("hello world")
hello world

如果我们将此函数与您的列表相匹配,它将打印所有行

list_a.map(myprint)

我们也可以像下面提到的那样写一个匿名函数,它做同样的事情。

list_a.map(arg=>println(arg))

在您的例子中,line 是文件的第一行。您可以根据需要更改参数名称。例如,在上面的示例中,如果我将 arg 更改为 line,它将毫无问题地工作

list_a.map(line=>println(line))