如何查看为使用 spark 从数据库并行读取每个分区的数据而生成的多个查询
how can I see the multiple queries that gets generated for reading data for each partition in parallel from database using spark
我正在尝试使用 Spark 从 Postgres table 读取数据。最初我是在不使用 lowerBound
、upperBound
、partitionColumn
和 numPartitions
的情况下在单线程上读取数据。我正在阅读的数据非常庞大,大约有 1.2 亿条记录。所以我决定使用 partitionColumn
并行读取数据。我能够读取数据,但 12 个并行线程读取它比单个线程读取它需要更多时间。我无法弄清楚如何查看为每个分区并行获取数据而生成的 12 SQL 个查询。
我使用的代码是:
val query = s"(select * from db.testtable) as testquery"
val df = spark.read
.format("jdbc")
.option("url", jdbcurl)
.option("dbtable", query)
.option("partitionColumn","transactionbegin")
.option("numPartitions",12)
.option("driver", "org.postgresql.Driver")
.option("fetchsize", 50000)
.option("user","user")
.option("password", "password")
.option("lowerBound","2019-01-01 00:00:00")
.option("upperBound","2019-12-31 23:59:00")
.load
df.count()
我在哪里以及如何查看为在每个线程上并行读取数据而创建的 12 个并行查询?
我能够看到在 Spark UI 中创建了 12 个任务,但无法找到一种方法来查看生成了哪些单独的 12 个查询以从 Postgres table.[=16 并行获取数据=]
有什么办法可以降低过滤器,使其只读取今年的数据,在本例中为 2019 年。
它不完全是多个查询,但它实际上会显示 Spark 根据您的查询优化的执行计划。它可能并不完美,具体取决于您必须执行的阶段。
你可以用 DataFrame 的形式编写你的 dag,在实际调用一个动作之前,你可以在它上面使用 explain()
方法。阅读它可能具有挑战性,但它是颠倒的。阅读本文时,来源位于底部。如果您尝试阅读,它可能看起来有点不寻常,所以如果您是第一次阅读,请从基本的转换开始,然后逐步进行。
SQL 语句使用“信息”日志级别打印,see here. You need to change Spark's log level to "info" to see the SQL. Additionally it printed the where condition alone too as here。
您还可以使用 pg_stat_statements
视图查看 Postgresql 数据库中的 SQL,这需要安装单独的插件。有一种方法可以记录 SQLs 并将它们视为 mentioned here.
我怀疑您的并行度很慢,因为您的 table 的“transactionbegin”列上没有索引。必须为 partitionColumn
编制索引,否则它将在所有会阻塞的并行会话中扫描整个 table。
我正在尝试使用 Spark 从 Postgres table 读取数据。最初我是在不使用 lowerBound
、upperBound
、partitionColumn
和 numPartitions
的情况下在单线程上读取数据。我正在阅读的数据非常庞大,大约有 1.2 亿条记录。所以我决定使用 partitionColumn
并行读取数据。我能够读取数据,但 12 个并行线程读取它比单个线程读取它需要更多时间。我无法弄清楚如何查看为每个分区并行获取数据而生成的 12 SQL 个查询。
我使用的代码是:
val query = s"(select * from db.testtable) as testquery"
val df = spark.read
.format("jdbc")
.option("url", jdbcurl)
.option("dbtable", query)
.option("partitionColumn","transactionbegin")
.option("numPartitions",12)
.option("driver", "org.postgresql.Driver")
.option("fetchsize", 50000)
.option("user","user")
.option("password", "password")
.option("lowerBound","2019-01-01 00:00:00")
.option("upperBound","2019-12-31 23:59:00")
.load
df.count()
我在哪里以及如何查看为在每个线程上并行读取数据而创建的 12 个并行查询?
我能够看到在 Spark UI 中创建了 12 个任务,但无法找到一种方法来查看生成了哪些单独的 12 个查询以从 Postgres table.[=16 并行获取数据=]
有什么办法可以降低过滤器,使其只读取今年的数据,在本例中为 2019 年。
它不完全是多个查询,但它实际上会显示 Spark 根据您的查询优化的执行计划。它可能并不完美,具体取决于您必须执行的阶段。
你可以用 DataFrame 的形式编写你的 dag,在实际调用一个动作之前,你可以在它上面使用 explain()
方法。阅读它可能具有挑战性,但它是颠倒的。阅读本文时,来源位于底部。如果您尝试阅读,它可能看起来有点不寻常,所以如果您是第一次阅读,请从基本的转换开始,然后逐步进行。
SQL 语句使用“信息”日志级别打印,see here. You need to change Spark's log level to "info" to see the SQL. Additionally it printed the where condition alone too as here。
您还可以使用 pg_stat_statements
视图查看 Postgresql 数据库中的 SQL,这需要安装单独的插件。有一种方法可以记录 SQLs 并将它们视为 mentioned here.
我怀疑您的并行度很慢,因为您的 table 的“transactionbegin”列上没有索引。必须为 partitionColumn
编制索引,否则它将在所有会阻塞的并行会话中扫描整个 table。