apache spark如何使用mapPartitions在以下场景中分配任务?
How does apache spark allocate tasks in the following scenario with mapPartitions?
给定以下 Apache Spark (Python) 代码(它正在运行):
import sys
from random import random
from operator import add
import sqlite3
from datetime import date
from datetime import datetime
from pyspark import SparkContext
def agePartition(recs):
gconn = sqlite3.connect('/home/chris/test.db')
myc = gconn.cursor()
today = date.today()
return_part = []
for rec in recs:
sql = "select birth_date from peeps where name = '{n}'".format(n=rec[0])
myc.execute(sql)
bdrec = myc.fetchone()
born = datetime.strptime(bdrec[0], '%Y-%m-%d')
return_part.append( (rec[0], today.year - born.year - ((today.month, today.day) < (born.month, born.day))) )
gconn.close()
return iter(return_part)
if __name__ == "__main__":
"""
Usage: pi [partitions]
"""
sc = SparkContext(appName="PythonDBTEST")
print('starting...')
data = [('Chris', 1), ('Amanda', 2), ('Shiloh', 2), ('Sammy', 2), ('Tim', 1)]
rdd = sc.parallelize(data,5)
rslt_collect = rdd.mapPartitions(agePartition).collect()
for x in rslt_collect:
print("{n} is {a}".format(n=x[0], a=x[1]))
sc.stop()
在总共有 8 个 cpu 的两个计算/从节点设置中,每个分区都将创建为一个任务并分配给 2 个节点,以便所有 5 个分区 运行 并行吗?如果没有,还需要做些什么来确保发生这种情况?
这里的目的是测试保持每个从属工作进程的全局数据库连接处于活动状态,这样就不必为 RDD 中处理的每条记录重新打开数据库连接。我在此示例中使用的是 SQLite,但它将是一个 SQLCipher 数据库,打开数据库连接会耗费更多时间。
假设集群中有 8 个可用插槽 (cpus)。您最多可以同时处理 8 个分区。在您的例子中,您有 5 个分区,因此它们都应该并行处理。这将是 5 个与数据库的并发连接。
My expectation would be one per core so that if the number of records were much greater I would not be continually recreating database connections.
在您的情况下,它将针对每个分区。如果你有 20 个分区和 8 个核心,你仍然会创建连接 20 次。
给定以下 Apache Spark (Python) 代码(它正在运行):
import sys
from random import random
from operator import add
import sqlite3
from datetime import date
from datetime import datetime
from pyspark import SparkContext
def agePartition(recs):
gconn = sqlite3.connect('/home/chris/test.db')
myc = gconn.cursor()
today = date.today()
return_part = []
for rec in recs:
sql = "select birth_date from peeps where name = '{n}'".format(n=rec[0])
myc.execute(sql)
bdrec = myc.fetchone()
born = datetime.strptime(bdrec[0], '%Y-%m-%d')
return_part.append( (rec[0], today.year - born.year - ((today.month, today.day) < (born.month, born.day))) )
gconn.close()
return iter(return_part)
if __name__ == "__main__":
"""
Usage: pi [partitions]
"""
sc = SparkContext(appName="PythonDBTEST")
print('starting...')
data = [('Chris', 1), ('Amanda', 2), ('Shiloh', 2), ('Sammy', 2), ('Tim', 1)]
rdd = sc.parallelize(data,5)
rslt_collect = rdd.mapPartitions(agePartition).collect()
for x in rslt_collect:
print("{n} is {a}".format(n=x[0], a=x[1]))
sc.stop()
在总共有 8 个 cpu 的两个计算/从节点设置中,每个分区都将创建为一个任务并分配给 2 个节点,以便所有 5 个分区 运行 并行吗?如果没有,还需要做些什么来确保发生这种情况?
这里的目的是测试保持每个从属工作进程的全局数据库连接处于活动状态,这样就不必为 RDD 中处理的每条记录重新打开数据库连接。我在此示例中使用的是 SQLite,但它将是一个 SQLCipher 数据库,打开数据库连接会耗费更多时间。
假设集群中有 8 个可用插槽 (cpus)。您最多可以同时处理 8 个分区。在您的例子中,您有 5 个分区,因此它们都应该并行处理。这将是 5 个与数据库的并发连接。
My expectation would be one per core so that if the number of records were much greater I would not be continually recreating database connections.
在您的情况下,它将针对每个分区。如果你有 20 个分区和 8 个核心,你仍然会创建连接 20 次。