Spark 运行 多次查询数据库
Spark is running queries into database multiple times
我正在尝试使用以下代码加载数据集以激发:
Dataset<Row> dataset = spark.read().jdbc(RPP_CONNECTION_URL, creditoDia3, rppDBProperties));
dataset = dataset.union(spark.read().jdbc(RPP_CONNECTION_URL, creditoDia2, rppDBProperties)));
dataset = dataset.union(spark.read().jdbc(RPP_CONNECTION_URL, creditoDia, rppDBProperties)));
dataset = dataset.union(spark.read().jdbc(RPP_CONNECTION_URL, debitoDia3, rppDBProperties)));
dataset = dataset.union(spark.read().jdbc(RPP_CONNECTION_URL, debitoDia2, rppDBProperties)));
dataset = dataset.union(spark.read().jdbc(RPP_CONNECTION_URL, debitoDia,rppDBProperties)));
dataset = dataset.cache();
Long numberOfRowsProcessed = dataset.count();
所以在这 6 个会话访问我的数据库并提取数据集并计算行数之后,我就不需要再访问数据库了。但是在 运行 下面的代码之后:
dataset.createOrReplaceTempView("temp");
Dataset<Row> base = spark.sql(new StringBuilder()
.append("select ")
.append("TRANSACTION ")
.append("from temp ")
.append("where PAYMENT_METHOD in (1,2,3,4) ")
.append("and TRANSACTION_STATUS in ('A','B') ")
.toString()
);
base.createOrReplaceTempView("base");
但是,我实际看到的是再次触发 运行 查询,但这次附加了我在定义 Dataset<Row> base
时传递的过滤器。如您所见,我已经缓存了数据,但没有任何效果。
问题:是否可以将内存中的所有内容加载到 spark 中并使用缓存数据,查询 spark 而不是数据库?
从我的关系数据库中获取数据非常昂贵并且需要一段时间。
更新
我注意到 spark 在尝试执行时正在向数据库发送新查询
from base a
left join base b on on a.IDT_TRANSACTION = b.IDT_TRANSACTION and a.DATE = b.DATE
这是 spark 附加到查询的字符串(从数据库中捕获):
WHERE ("IDT_TRANSACTION_STATUS" IS NOT NULL) AND ("NUM_BIN_CARD" IS NOT NULL)
日志中出现:
18/01/16 14:22:20 INFO DAGScheduler: ShuffleMapStage 12 (show at
RelatorioBinTransacao.java:496) finished in 13,046 s 18/01/16 14:22:20
INFO DAGScheduler: looking for newly runnable stages 18/01/16 14:22:20
INFO DAGScheduler: running: Set(ShuffleMapStage 9) 18/01/16 14:22:20
INFO DAGScheduler: waiting: Set(ShuffleMapStage 13, ShuffleMapStage
10, ResultStage 14, ShuffleMapStage 11) 18/01/16 14:22:20 INFO
DAGScheduler: failed: Set()
我不确定我是否明白我想说的话,但我认为我的记忆中缺少某些东西。
如果我只是像这样在左侧加入评论:
from base a
//left join base b on on a.IDT_TRANSACTION = b.IDT_TRANSACTION and a.DATE = b.DATE
它工作正常,不再进入数据库。
听起来您可能没有足够的内存来在集群上存储联合结果。在 Long numberOfRowsProcessed = dataset.count();
之后,请查看您的 Spark UI 的存储选项卡,查看整个数据集是否已完全缓存。如果不是,那么您需要更多内存(and/or 磁盘 space)。
如果您确认数据集确实被缓存,那么请post查询计划(例如base.explain()
)。
我找到了解决问题的方法。我必须在向数据库发送查询的每一行中添加一条 cache()
指令。所以它看起来像这样:
Dataset<Row> dataset = spark.read().jdbc(RPP_CONNECTION_URL, fake, rppDBProperties);
dataset = dataset.union(spark.read().jdbc(RPP_CONNECTION_URL, creditoDia3, rppDBProperties).cache());
dataset = dataset.union(spark.read().jdbc(RPP_CONNECTION_URL, creditoDia2, rppDBProperties).cache());
dataset = dataset.union(spark.read().jdbc(RPP_CONNECTION_URL, creditoDia, rppDBProperties).cache());
dataset = dataset.union(spark.read().jdbc(RPP_CONNECTION_URL, debitoDia3, rppDBProperties).cache());
dataset = dataset.union(spark.read().jdbc(RPP_CONNECTION_URL, debitoDia2, rppDBProperties).cache());
dataset = dataset.union(spark.read().jdbc(RPP_CONNECTION_URL, debitoDia,rppDBProperties).cache());
dataset = dataset.cache();
我只好加了第一行fake
sql,因为无论我做什么,spark似乎都没有考虑缓存第一个查询,所以我一直看到第一个查询被发送到数据库。
最重要的是,如果我已经在末尾添加了 cache()
指令,我不明白为什么还要在每一行中添加一条指令。但是,它奏效了。
我正在尝试使用以下代码加载数据集以激发:
Dataset<Row> dataset = spark.read().jdbc(RPP_CONNECTION_URL, creditoDia3, rppDBProperties));
dataset = dataset.union(spark.read().jdbc(RPP_CONNECTION_URL, creditoDia2, rppDBProperties)));
dataset = dataset.union(spark.read().jdbc(RPP_CONNECTION_URL, creditoDia, rppDBProperties)));
dataset = dataset.union(spark.read().jdbc(RPP_CONNECTION_URL, debitoDia3, rppDBProperties)));
dataset = dataset.union(spark.read().jdbc(RPP_CONNECTION_URL, debitoDia2, rppDBProperties)));
dataset = dataset.union(spark.read().jdbc(RPP_CONNECTION_URL, debitoDia,rppDBProperties)));
dataset = dataset.cache();
Long numberOfRowsProcessed = dataset.count();
所以在这 6 个会话访问我的数据库并提取数据集并计算行数之后,我就不需要再访问数据库了。但是在 运行 下面的代码之后:
dataset.createOrReplaceTempView("temp");
Dataset<Row> base = spark.sql(new StringBuilder()
.append("select ")
.append("TRANSACTION ")
.append("from temp ")
.append("where PAYMENT_METHOD in (1,2,3,4) ")
.append("and TRANSACTION_STATUS in ('A','B') ")
.toString()
);
base.createOrReplaceTempView("base");
但是,我实际看到的是再次触发 运行 查询,但这次附加了我在定义 Dataset<Row> base
时传递的过滤器。如您所见,我已经缓存了数据,但没有任何效果。
问题:是否可以将内存中的所有内容加载到 spark 中并使用缓存数据,查询 spark 而不是数据库?
从我的关系数据库中获取数据非常昂贵并且需要一段时间。
更新
我注意到 spark 在尝试执行时正在向数据库发送新查询
from base a
left join base b on on a.IDT_TRANSACTION = b.IDT_TRANSACTION and a.DATE = b.DATE
这是 spark 附加到查询的字符串(从数据库中捕获):
WHERE ("IDT_TRANSACTION_STATUS" IS NOT NULL) AND ("NUM_BIN_CARD" IS NOT NULL)
日志中出现:
18/01/16 14:22:20 INFO DAGScheduler: ShuffleMapStage 12 (show at RelatorioBinTransacao.java:496) finished in 13,046 s 18/01/16 14:22:20 INFO DAGScheduler: looking for newly runnable stages 18/01/16 14:22:20 INFO DAGScheduler: running: Set(ShuffleMapStage 9) 18/01/16 14:22:20 INFO DAGScheduler: waiting: Set(ShuffleMapStage 13, ShuffleMapStage 10, ResultStage 14, ShuffleMapStage 11) 18/01/16 14:22:20 INFO DAGScheduler: failed: Set()
我不确定我是否明白我想说的话,但我认为我的记忆中缺少某些东西。
如果我只是像这样在左侧加入评论:
from base a
//left join base b on on a.IDT_TRANSACTION = b.IDT_TRANSACTION and a.DATE = b.DATE
它工作正常,不再进入数据库。
听起来您可能没有足够的内存来在集群上存储联合结果。在 Long numberOfRowsProcessed = dataset.count();
之后,请查看您的 Spark UI 的存储选项卡,查看整个数据集是否已完全缓存。如果不是,那么您需要更多内存(and/or 磁盘 space)。
如果您确认数据集确实被缓存,那么请post查询计划(例如base.explain()
)。
我找到了解决问题的方法。我必须在向数据库发送查询的每一行中添加一条 cache()
指令。所以它看起来像这样:
Dataset<Row> dataset = spark.read().jdbc(RPP_CONNECTION_URL, fake, rppDBProperties);
dataset = dataset.union(spark.read().jdbc(RPP_CONNECTION_URL, creditoDia3, rppDBProperties).cache());
dataset = dataset.union(spark.read().jdbc(RPP_CONNECTION_URL, creditoDia2, rppDBProperties).cache());
dataset = dataset.union(spark.read().jdbc(RPP_CONNECTION_URL, creditoDia, rppDBProperties).cache());
dataset = dataset.union(spark.read().jdbc(RPP_CONNECTION_URL, debitoDia3, rppDBProperties).cache());
dataset = dataset.union(spark.read().jdbc(RPP_CONNECTION_URL, debitoDia2, rppDBProperties).cache());
dataset = dataset.union(spark.read().jdbc(RPP_CONNECTION_URL, debitoDia,rppDBProperties).cache());
dataset = dataset.cache();
我只好加了第一行fake
sql,因为无论我做什么,spark似乎都没有考虑缓存第一个查询,所以我一直看到第一个查询被发送到数据库。
最重要的是,如果我已经在末尾添加了 cache()
指令,我不明白为什么还要在每一行中添加一条指令。但是,它奏效了。