我对 Spark 中并行操作的理解是否正确?
Is my understanding of parallel operations in Spark correct?
我是 Spark 的新手,正在尝试通过 Python 来理解 Spark 的概念。在使用 Python 为 Spark 开发应用程序时,我对以并行方式处理数据的方式感到有点困惑。
1。每个人都说我不需要担心在处理封装在 RDD 变量中的数据时会涉及到哪个节点和多少个节点。因此,根据我的最佳理解,我相信 Spark 集群会对以下代码执行的操作:
a = sc.textFile(filename)
b = a.filter(lambda x: len(x) > 0 and x.split("\t").count("9999-12-31") == 0)
c = b.collect()
可以描述为以下步骤:
(1)变量a
将保存为包含预期txt文件内容的RDD变量
(2) RDD的不同chunk a
会被广播到集群中的不同节点,并对不同节点中的每个chunk进行filter方法
(3)调用采集动作时,结果会从不同的节点返回给master,并保存为局部变量,c
.
我的描述对吗?如果没有,具体程序是什么?如果我是对的,那么并行化方法有什么意义呢?下面的代码是否和上面列出的一样?
a = sc.textFile(filename).collect()
b = sc.parallelize(a).filter(lambda x: len(x)>0 and x.split("\t").count("9999-12-31"))
c = b.collect()
2。对于下面的代码,SQL查询语法是否会通过将定义的table分成许多分区来并行处理?
a = sc.textFile(filename)
b = a.filter(lambda x: len(x) > 0 and x.split("\t").count("9999-12-31") == 0)
parts = b.map(lambda x: x.split("\t"))
records = parts.map(Row(r0 = str(x[0]), r1 = x[1], r2 = x[2]))
rTable = sqlContext.createDataFrame(records)
rTable.registerTempTable("rTable")
result = sqlContext.sql("select substr(r0,1,2), case when r1=1 then r1*100 else r1*10 end, r2 from rTable").collect()
您对第一步的描述是正确的。但是关于第二步和第三步还有更多内容。
第二步:
根据 Spark 文档:
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]
The textFile method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 64MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value.
如果您将文件放在 HDFS 中并将其路径作为 textFile
参数传递,RDD a
的分区将基于 HDFS 块创建。所以在这种情况下,腭化的数量取决于 HDFS 块的数量。数据也已经分区并通过 HDFS 移动到集群机器上。
如果您使用本地文件系统上的路径(在所有节点上可用)并且不指定 minPartitions
,则选择默认并行度(取决于集群中的核心数)。在这种情况下,您必须将文件复制到每个工作人员或将其放入每个工作人员都可以使用的共享存储中。
在每种情况下,Spark 都避免广播任何数据,而是尝试使用每台机器中的现有块。所以你的第二步并不完全正确。
第三步
根据 Spark 文档:
collect(): Array[T]
Return an array that contains all of the elements in this RDD
在这一步中,您的 RDD b
shuffled/collected 进入您的驱动程序 program/node。
我是 Spark 的新手,正在尝试通过 Python 来理解 Spark 的概念。在使用 Python 为 Spark 开发应用程序时,我对以并行方式处理数据的方式感到有点困惑。
1。每个人都说我不需要担心在处理封装在 RDD 变量中的数据时会涉及到哪个节点和多少个节点。因此,根据我的最佳理解,我相信 Spark 集群会对以下代码执行的操作:
a = sc.textFile(filename)
b = a.filter(lambda x: len(x) > 0 and x.split("\t").count("9999-12-31") == 0)
c = b.collect()
可以描述为以下步骤:
(1)变量a
将保存为包含预期txt文件内容的RDD变量
(2) RDD的不同chunk a
会被广播到集群中的不同节点,并对不同节点中的每个chunk进行filter方法
(3)调用采集动作时,结果会从不同的节点返回给master,并保存为局部变量,c
.
我的描述对吗?如果没有,具体程序是什么?如果我是对的,那么并行化方法有什么意义呢?下面的代码是否和上面列出的一样?
a = sc.textFile(filename).collect()
b = sc.parallelize(a).filter(lambda x: len(x)>0 and x.split("\t").count("9999-12-31"))
c = b.collect()
2。对于下面的代码,SQL查询语法是否会通过将定义的table分成许多分区来并行处理?
a = sc.textFile(filename)
b = a.filter(lambda x: len(x) > 0 and x.split("\t").count("9999-12-31") == 0)
parts = b.map(lambda x: x.split("\t"))
records = parts.map(Row(r0 = str(x[0]), r1 = x[1], r2 = x[2]))
rTable = sqlContext.createDataFrame(records)
rTable.registerTempTable("rTable")
result = sqlContext.sql("select substr(r0,1,2), case when r1=1 then r1*100 else r1*10 end, r2 from rTable").collect()
您对第一步的描述是正确的。但是关于第二步和第三步还有更多内容。
第二步:
根据 Spark 文档:
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]
The textFile method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 64MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value.
如果您将文件放在 HDFS 中并将其路径作为 textFile
参数传递,RDD a
的分区将基于 HDFS 块创建。所以在这种情况下,腭化的数量取决于 HDFS 块的数量。数据也已经分区并通过 HDFS 移动到集群机器上。
如果您使用本地文件系统上的路径(在所有节点上可用)并且不指定 minPartitions
,则选择默认并行度(取决于集群中的核心数)。在这种情况下,您必须将文件复制到每个工作人员或将其放入每个工作人员都可以使用的共享存储中。
在每种情况下,Spark 都避免广播任何数据,而是尝试使用每台机器中的现有块。所以你的第二步并不完全正确。
第三步
根据 Spark 文档:
collect(): Array[T] Return an array that contains all of the elements in this RDD
在这一步中,您的 RDD b
shuffled/collected 进入您的驱动程序 program/node。