如何查看为使用 spark 从数据库并行读取每个分区的数据而生成的多个查询

how can I see the multiple queries that gets generated for reading data for each partition in parallel from database using spark

我正在尝试使用 Spark 从 Postgres table 读取数据。最初我是在不使用 lowerBoundupperBoundpartitionColumnnumPartitions 的情况下在单线程上读取数据。我正在阅读的数据非常庞大,大约有 1.2 亿条记录。所以我决定使用 partitionColumn 并行读取数据。我能够读取数据,但 12 个并行线程读取它比单个线程读取它需要更多时间。我无法弄清楚如何查看为每个分区并行获取数据而生成的 12 SQL 个查询。

我使用的代码是:

val query = s"(select * from db.testtable) as testquery" 
val df = spark.read
    .format("jdbc")
    .option("url", jdbcurl)
    .option("dbtable", query)
    .option("partitionColumn","transactionbegin")
    .option("numPartitions",12) 
    .option("driver", "org.postgresql.Driver")
    .option("fetchsize", 50000)  
    .option("user","user")
    .option("password", "password")
    .option("lowerBound","2019-01-01 00:00:00")
    .option("upperBound","2019-12-31 23:59:00")
    .load
df.count()

我在哪里以及如何查看为在每个线程上并行读取数据而创建的 12 个并行查询?
我能够看到在 Spark UI 中创建了 12 个任务,但无法找到一种方法来查看生成了哪些单独的 12 个查询以从 Postgres table.[=16 并行获取数据=]

有什么办法可以降低过滤器,使其只读取今年的数据,在本例中为 2019 年。

它不完全是多个查询,但它实际上会显示 Spark 根据您的查询优化的执行计划。它可能并不完美,具体取决于您必须执行的阶段。

你可以用 DataFrame 的形式编写你的 dag,在实际调用一个动作之前,你可以在它上面使用 explain() 方法。阅读它可能具有挑战性,但它是颠倒的。阅读本文时,来源位于底部。如果您尝试阅读,它可能看起来有点不寻常,所以如果您是第一次阅读,请从基本的转换开始,然后逐步进行。

SQL 语句使用“信息”日志级别打印,see here. You need to change Spark's log level to "info" to see the SQL. Additionally it printed the where condition alone too as here。 您还可以使用 pg_stat_statements 视图查看 Postgresql 数据库中的 SQL,这需要安装单独的插件。有一种方法可以记录 SQLs 并将它们视为 mentioned here.

我怀疑您的并行度很慢,因为您的 table 的“transactionbegin”列上没有索引。必须为 partitionColumn 编制索引,否则它将在所有会阻塞的并行会话中扫描整个 table。