只有单线程使用多处理池对 PySpark 执行并行 SQL 查询

Only single thread executes parallel SQL query with PySpark using multiprocessing pool

我有一个案例,我正在使用 PySpark(如果我不能使用 Python 而需要使用 Scala 或 Java,则使用 Spark)从数百个数据库中提取数据table 缺少主键。 (为什么 Oracle 会创建一个包含 tables 和主键的 ERP 产品是一个不同的主题......但无论如何,我们需要能够从每个数据库中提取数据并保存数据 table 到 Parquet 文件中。)我最初尝试使用 Sqoop 而不是 PySpark,但由于我们 运行 遇到的一些问题,尝试使用 PySpark/Spark 更有意义。

理想情况下,我希望每个任务节点都在我的计算集群中:获取 table 的名称,从数据库中查询 table,然后保存 table 作为 S3 中的 Parquet 文件(或一组 Parquet 文件)。我的第一步是让它在本地以独立模式运行。 (如果每个给定 table 都有一个主键,那么我可以针对给定 table 将查询和文件保存过程分区到不同的行集,并将行分区分布到任务节点中计算集群并行执行文件保存操作,但由于 Oracle 的 ERP 产品缺少关注的 table 的主键,这不是一个选项。)

我能够使用 PySpark 成功查询目标数据库,并且能够使用多线程将数据成功保存到 parquet 文件中,但是由于某些原因,只有一个线程可以任何东西所以,发生的事情是只有一个线程获取一个table名称,查询数据库,并将文件作为Parquet文件保存到所需的目录。然后作业结束就好像没有其他线程被执行一样。我猜可能发生了某种类型的锁定问题。 如果我正确理解这里的评论:How to run multiple jobs in one Sparkcontext from separate threads in PySpark? 那么我尝试做的事情应该是可能的,除非存在与执行并行 JDBC SQL 查询相关的特定问题。

编辑:我正在专门寻找一种允许我使用某种类型的线程池的方法,这样我就不需要为每个线程池手动创建一个线程我需要处理的 table 之一,并在我的集群中的任务节点之间手动对它们进行负载平衡。

即使我尝试设置:

--master local[*]

--conf 'spark.scheduler.mode=FAIR'

问题依旧。

此外,为了简要说明我的代码,我需要使用自定义 JDBC 驱动程序,并且我 运行 在 Windows 上的 Jupyter 笔记本中编写代码,所以我正在使用一种变通方法来确保 PySpark 以正确的参数启动。 (郑重声明,我并不反对其他操作系统,但我的 Windows 机器是我最快的工作站,所以这就是我使用它的原因。)

这是我的设置:

driverPath = r'C:\src\NetSuiteJDBC\NQjc.jar'
os.environ["PYSPARK_SUBMIT_ARGS"] = (
    "--driver-class-path '{0}' --jars '{0}' --master local[*] --conf 'spark.scheduler.mode=FAIR' --conf 'spark.scheduler.allocation.file=C:\src\PySparkConfigs\fairscheduler.xml' pyspark-shell".format(driverPath)
)

import findspark
findspark.init()
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, Column, Row, SQLContext
from pyspark.sql.functions import col, split, regexp_replace, when
from pyspark.sql.types import ArrayType, IntegerType, StringType

spark = SparkSession.builder.appName("sparkNetsuite").getOrCreate()
spark.sparkContext.setLogLevel("INFO")
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "production")
sc = SparkContext.getOrCreate()

然后,为了测试多处理,我在 运行 我的 Jupyter notebook 所在的目录中创建了文件 sparkMethods.py 并将此方法放入其中:

def testMe(x):
    return x*x

当我运行:

from multiprocessing import Pool
import sparkMethods

if __name__ == '__main__':
    pool = Pool(processes=4)              # start 4 worker processes

    # print "[0, 1, 4,..., 81]"
    print(pool.map(sparkMethods.testMe, range(10)))

在我的 Jupyter 笔记本中,我得到了预期的输出:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

现在,在有人反对我编写下一个方法的方式之前,请知道我最初尝试通过闭包传递 spark 上下文,然后 运行 进入 Pickling 错误,如此处记录: 因此,我将所有 Spark 上下文包含在我放入 sparkMethods.py 文件的下一个方法中(至少在我找到更好的方法之前)。我将这些方法放入外部文件(而不是仅将它们包含在 Jupyter Notebook 中)的原因是为了处理这个问题:https://bugs.python.org/issue25053 正如这里所讨论的: 和这里: python multiprocessing: AttributeError: Can't get attribute "abc"

这是包含建立 JDBC 连接逻辑的方法:

# In sparkMethods.py file:
def getAndSaveTableInPySpark(tableName):
    import os
    import os.path
    from pyspark.sql import SparkSession, SQLContext
    spark = SparkSession.builder.appName("sparkNetsuite").getOrCreate()
    spark.sparkContext.setLogLevel("INFO")
    spark.sparkContext.setLocalProperty("spark.scheduler.pool", "production")

    jdbcDF = spark.read \
        .format("jdbc") \
        .option("url", "OURCONNECTIONURL;") \
        .option("driver", "com.netsuite.jdbc.openaccess.OpenAccessDriver") \
        .option("dbtable", tableName) \
        .option("user", "USERNAME") \
        .option("password", "PASSWORD") \
        .load()

    filePath = "C:\src\NetsuiteSparkProject\" + tableName + "\" + tableName + ".parquet"
    jdbcDF.write.parquet(filePath)
    fileExists = os.path.exists(filePath)
    if(fileExists):
        return (filePath + " exists!")
    else:
        return (filePath + " could not be written!")

然后,回到我的 Jupyter 笔记本,我 运行:

import sparkMethods
from multiprocessing import Pool

if __name__ == '__main__':
    with Pool(5) as p:
        p.map(sparkMethods.getAndSaveTableInPySpark, top5Tables)

问题是好像只有一个线程在执行。

当我执行它时,在控制台输出中,我看到它最初包括:

The process cannot access the file because it is being used by another process. The system cannot find the file C:\Users\DEVIN~1.BOS\AppData\Local\Temp\spark-class-launcher-output-3662.txt. . . .

这让我怀疑可能发生了某种类型的锁定。

无论如何,其中一个线程总是会 运行 成功完成并成功查询其对应的 table 并根据需要将其保存到 Parquet 文件中。该过程中存在一些不确定性,因为不同的执行会导致不同的线程赢得比赛并因此处理不同的 table。 有趣的是,只有一个作业被执行,如 Spark UI 所示: 但是,这里的文章:https://medium.com/@rbahaguejr/threaded-tasks-in-pyspark-jobs-d5279844dac0 暗示如果成功启动,我应该期望在 Spark UI 中看到多个作业。

现在,如果问题是 PySpark 实际上无法 运行 跨不同任务节点并行执行多个 JDBC 查询,那么也许我的解决方案是使用 JDBC 连接池,甚至只是为每个 table 打开一个连接(只要我在线程末尾关闭连接)。 当获取要处理的 table 列表时,我成功地通过 jaydebeapi 库连接到数据库,如下所示:

import jaydebeapi
conn = jaydebeapi.connect("com.netsuite.jdbc.openaccess.OpenAccessDriver",  
                          "OURCONNECTIONURL;", 
                          ["USERNAME", "PASSWORD"], 
                          r"C:\src\NetSuiteJDBC\NQjc.jar")

top5Tables = list(pd.read_sql("SELECT TOP 5 TABLE_NAME FROM OA_TABLES WHERE TABLE_OWNER != 'SYSTEM';", conn)["TABLE_NAME"].values)
conn.close()
top5Tables

输出为:

['SALES_TERRITORY_PLAN_PARTNER',
 'WORK_ORDER_SCHOOLS_TO_INSTALL_MAP',
 'ITEM_ACCOUNT_MAP',
 'PRODUCT_TRIAL_STATUS',
 'ACCOUNT_PERIOD_ACTIVITY']

因此,可以想象,如果问题是 PySpark 不能像这样用于跨任务节点分发多个查询,那么也许我可以使用 jaydebeapi 库来建立连接。但是,在那种情况下,我仍然需要一种方法能够将 JDBC SQL 查询的输出写入 Parquet 文件(理想情况下会利用 Spark 的模式推理功能),但我如果可行,我愿意采用这种方法。

那么,如何在主节点不按顺序执行所有查询的情况下成功查询数据库并将输出并行保存到 Parquet 文件(即分布在任务节点上)?

在回答我的问题时评论提供了一些提示,以及这里的答案: 我研究了线程而不是多处理的使用。 我更仔细地查看了这里的一个答案:How to run multiple jobs in one Sparkcontext from separate threads in PySpark? 并注意到使用:

from multiprocessing.pool import ThreadPool

我能够像这样让它工作:

from multiprocessing.pool import ThreadPool
pool = ThreadPool(5)
results = pool.map(sparkMethods.getAndSaveTableInPySpark, top5Tables)
pool.close() 
pool.join() 
print(*results, sep='\n')

打印:

C:\src\NetsuiteSparkProject\SALES_TERRITORY_PLAN_PARTNER\SALES_TERRITORY_PLAN_PARTNER.parquet exists!
C:\src\NetsuiteSparkProject\WORK_ORDER_SCHOOLS_TO_INSTALL_MAP\WORK_ORDER_SCHOOLS_TO_INSTALL_MAP.parquet exists!
C:\src\NetsuiteSparkProject\ITEM_ACCOUNT_MAP\ITEM_ACCOUNT_MAP.parquet exists!
C:\src\NetsuiteSparkProject\PRODUCT_TRIAL_STATUS\PRODUCT_TRIAL_STATUS.parquet exists!
C:\src\NetsuiteSparkProject\ACCOUNT_PERIOD_ACTIVITY\ACCOUNT_PERIOD_ACTIVITY.parquet exists!

基本上,Spark 负责底层的并行化,不需要使用 multiprocessing 包,事实上它可能会干扰 Spark,而且完全没有必要。但是必须做一些事情才能利用这一点。关键是首先构建查询和转换,但不要执行任何操作。还要确保您的 spark 集群设置有多个工作节点,工作会分发到这些节点。一个简单的方法是使用 DataBricks notebooks 或大型云供应商提供的其他服务,它们会为您设置所有这些。

Spark 有两种模式。 TRANSFORMATIONS(不执行任何操作,只是简单地设置查询和转换,类似于 SQL)。以及实际执行查询并对结果采取行动的 ACTIONS。 count() 是一个动作。 show() 是一个动作。查询是一种转换,table 添加是一种转换。

要使用 Spark 内置的固有并行性,请在 Spark 中将多个查询和转换写入不同的 table,但不要 collect()count()show()结果(此时不要执行任何操作,仅执行转换)。这将在内部安排查询但不会执行它们(这是 sparks 惰性模式)。

然后在代码的后面,当您 运行 一个动作(如计数、显示或收集)时,它会自动将工作并行分配给所有可用节点。这就是火花之美。您的本地设备不需要特殊的多处理,它都由 Spark 处理。

这是一个 pySpark 示例:

    # First build the queries but don't collect any data.
        part1_sdf = spark.sql(
          "SELECT UtcTime, uSecDelay, sender, Recipient, date , ID "
          "FROM Delay_table "
          "WHERE date between DATE_ADD(now(), - 60) AND DATE_ADD(now(), -59) "
          "AND ID = 'my_id' "
          "ORDER BY UtcTime DESC "
        )
        part2_sdf = spark.sql(
          "SELECT UtcTime, uSecDelay, sender, Recipient, date, ID "
          "FROM Delay_table "
          "WHERE date between DATE_ADD(now(), -58) AND DATE_ADD(now(), -57) "
          "AND ID = 'my_id' "
          "ORDER BY UtcTime DESC "
        )
        # Peform a Transformation on the 2 queries.  No data is pulled up to this point
        transformed_df = part1_sdf.union(part2_sdf)
        # Finally when an action is called, the data is pulled in parallel:
        transformed_df.show(10)
        ### Output
        +--------------------------+--------------------------------------+-----------------+--------------------+----------+--------+
        |UtcTime|                   uSecDelay|                              
 sender|Recipient|                    date|                                 ID|
        +--------------------------+--------------------------------------+-----------------+--------------------+----------+--------+
        |      2020-01-05 01:39:...|                                    69|                4|                  28|2020-01-05|  my_id|
        |      2020-01-05 01:39:...|                                    65|                4|                  26|2020-01-05|  my_id|
        |      2020-01-05 01:39:...|                                    62|                4|                   0|2020-01-05|  my_id|
        |      2020-01-05 01:39:...|                                   108|                4|                  16|2020-01-05|  my_id|
        |      2020-01-05 01:39:...|                                    68|                4|                  27|2020-01-05|  my_id|
        |      2020-01-05 01:39:...|                                    71|                4|                  53|2020-01-05|  my_id|
        |      2020-01-05 01:39:...|                                    68|                4|                   7|2020-01-05|  my_id|
        |      2020-01-05 01:39:...|                                    65|                4|                  57|2020-01-05|  my_id|
        |      2020-01-05 01:39:...|                                    64|                4|                  56|2020-01-05|  my_id|
        |      2020-01-05 01:39:...|                                    66|                4|                  44|2020-01-05|  my_id|
        +--------------------------+--------------------------------------+-----------------+--------------------+----------+--------+
        only showing top 10 rows