如何使用正确的名称在 scala 中划分 table
How to partition a table in scala with the proper name
我在 scala 2.4.0 中有一个大型 Dataframe,看起来像这样
+--------------------+--------------------+--------------------+-------------------+--------------+------+
| cookie| updated_score| probability| date_last_score|partition_date|target|
+--------------------+--------------------+--------------------+-------------------+--------------+------+
|00000000000001074780| 0.1110987111481027| 0.27492987342938174|2019-03-29 16:00:00| 2019-04-07_10| 0|
|00000000000001673799| 0.02621894072693878| 0.2029688362968775|2019-03-19 08:00:00| 2019-04-07_10| 0|
|00000000000002147908| 0.18922034021212567| 0.3520678649755828|2019-03-31 19:00:00| 2019-04-09_12| 1|
|00000000000004028302| 0.06803669083452231| 0.23089047208736854|2019-03-25 17:00:00| 2019-04-07_10| 0|
和这个架构:
root
|-- cookie: string (nullable = true)
|-- updated_score: double (nullable = true)
|-- probability: double (nullable = true)
|-- date_last_score: string (nullable = true)
|-- partition_date: string (nullable = true)
|-- target: integer (nullable = false)
然后我创建一个分区 table 并将数据插入 database.table_name。但是当我查看配置单元数据库并键入:show partitions database.table_name 我只得到 partition_date=0 和 partition_date=1,而 0 和 1 不是来自 partition_date 的值列。
不知道是不是我写错了,有些scala概念没看懂或者dataframe太大
我尝试了不同的方法来查找类似的问题:
result_df.write.mode(SaveMode.Overwrite).insertInto("table_name")
或
result_df.write.mode(SaveMode.Overwrite).saveAsTable("table_name")
如果有帮助,我提供一些来自 scala 的 INFO 消息:
查看此消息,我认为我的 result_df 分区正确。
19/07/31 07:53:57 INFO TaskSetManager: Starting task 11.0 in stage 2822.0 (TID 123456, ip-xx-xx-xx.aws.local.somewhere, executor 45, partition 11, PROCESS_LOCAL, 7767 bytes)
19/07/31 07:53:57 INFO TaskSetManager: Starting task 61.0 in stage 2815.0 (TID 123457, ip-xx-xx-xx-xyz.aws.local.somewhere, executor 33, partition 61, NODE_LOCAL, 8095 bytes)
然后,我开始将分区保存为Vector(0, 1, 2...),但我只能保存0和1?我真的不知道。
19/07/31 07:56:02 INFO DAGScheduler: Submitting 35 missing tasks from ShuffleMapStage 2967 (MapPartitionsRDD[130590] at insertInto at evaluate_decay_factor.scala:165) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14))
19/07/31 07:56:02 INFO YarnScheduler: Adding task set 2967.0 with 35 tasks
19/07/31 07:56:02 INFO DAGScheduler: Submitting ShuffleMapStage 2965 (MapPartitionsRDD[130578] at insertInto at evaluate_decay_factor.scala:165), which has no missing parents
我的代码如下所示:
val createTableSQL = s"""
CREATE TABLE IF NOT EXISTS table_name (
cookie string,
updated_score float,
probability float,
date_last_score string,
target int
)
PARTITIONED BY (partition_date string)
STORED AS PARQUET
TBLPROPERTIES ('PARQUET.COMPRESSION'='SNAPPY')
"""
spark.sql(createTableSQL)
result_df.write.mode(SaveMode.Overwrite).insertInto("table_name")
给定这样的数据框:
val result = Seq(
(8, "123", 1.2, 0.5, "bat", "2019-04-04_9"),
(64, "451", 3.2, -0.5, "mouse", "2019-04-04_12"),
(-27, "613", 8.2, 1.5, "horse", "2019-04-04_10"),
(-37, "513", 4.33, 2.5, "horse", "2019-04-04_11"),
(45, "516", -3.3, 3.4, "bat", "2019-04-04_10"),
(12, "781", 1.2, 5.5, "horse", "2019-04-04_11")
我想 运行:在配置单元命令行上显示分区 "table_name" 并得到:
partition_date=2019-04-04_9
partition_date=2019-04-04_10
partition_date=2019-04-04_11
partition_date=2019-04-04_12
我的输出是:
partition_date=0
partition_date=1
在这个简单的示例中,它工作得很好,但是对于我的大数据框,我得到了以前的输出。
要更改分区数,请使用 repartition(numOfPartitions)
要在写入时更改分区依据的列,请使用 partitionBy("col")
一起使用的例子:final_df.repartition(40).write.partitionBy("txnDate").mode("append").parquet(destination)
两个有用的提示:
- 使您的重新分区大小等于工作内核的数量,以实现最快的写入/重新分区。在这个例子中,我有 10 个执行器,每个执行器有 4 个核心(总共 40 个核心)。因此,我将其设置为 40.
- 当您写入目的地时,除了子存储桶之外,不要指定任何内容——让 spark 处理索引。
- 好的目的地:
"s3a://prod/subbucket/"
- 错误的目的地:
s"s3a://prod/subbucket/txndate=$txndate"
我在 scala 2.4.0 中有一个大型 Dataframe,看起来像这样
+--------------------+--------------------+--------------------+-------------------+--------------+------+
| cookie| updated_score| probability| date_last_score|partition_date|target|
+--------------------+--------------------+--------------------+-------------------+--------------+------+
|00000000000001074780| 0.1110987111481027| 0.27492987342938174|2019-03-29 16:00:00| 2019-04-07_10| 0|
|00000000000001673799| 0.02621894072693878| 0.2029688362968775|2019-03-19 08:00:00| 2019-04-07_10| 0|
|00000000000002147908| 0.18922034021212567| 0.3520678649755828|2019-03-31 19:00:00| 2019-04-09_12| 1|
|00000000000004028302| 0.06803669083452231| 0.23089047208736854|2019-03-25 17:00:00| 2019-04-07_10| 0|
和这个架构:
root
|-- cookie: string (nullable = true)
|-- updated_score: double (nullable = true)
|-- probability: double (nullable = true)
|-- date_last_score: string (nullable = true)
|-- partition_date: string (nullable = true)
|-- target: integer (nullable = false)
然后我创建一个分区 table 并将数据插入 database.table_name。但是当我查看配置单元数据库并键入:show partitions database.table_name 我只得到 partition_date=0 和 partition_date=1,而 0 和 1 不是来自 partition_date 的值列。
不知道是不是我写错了,有些scala概念没看懂或者dataframe太大
我尝试了不同的方法来查找类似的问题:
result_df.write.mode(SaveMode.Overwrite).insertInto("table_name")
或
result_df.write.mode(SaveMode.Overwrite).saveAsTable("table_name")
如果有帮助,我提供一些来自 scala 的 INFO 消息:
查看此消息,我认为我的 result_df 分区正确。
19/07/31 07:53:57 INFO TaskSetManager: Starting task 11.0 in stage 2822.0 (TID 123456, ip-xx-xx-xx.aws.local.somewhere, executor 45, partition 11, PROCESS_LOCAL, 7767 bytes)
19/07/31 07:53:57 INFO TaskSetManager: Starting task 61.0 in stage 2815.0 (TID 123457, ip-xx-xx-xx-xyz.aws.local.somewhere, executor 33, partition 61, NODE_LOCAL, 8095 bytes)
然后,我开始将分区保存为Vector(0, 1, 2...),但我只能保存0和1?我真的不知道。
19/07/31 07:56:02 INFO DAGScheduler: Submitting 35 missing tasks from ShuffleMapStage 2967 (MapPartitionsRDD[130590] at insertInto at evaluate_decay_factor.scala:165) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14))
19/07/31 07:56:02 INFO YarnScheduler: Adding task set 2967.0 with 35 tasks
19/07/31 07:56:02 INFO DAGScheduler: Submitting ShuffleMapStage 2965 (MapPartitionsRDD[130578] at insertInto at evaluate_decay_factor.scala:165), which has no missing parents
我的代码如下所示:
val createTableSQL = s"""
CREATE TABLE IF NOT EXISTS table_name (
cookie string,
updated_score float,
probability float,
date_last_score string,
target int
)
PARTITIONED BY (partition_date string)
STORED AS PARQUET
TBLPROPERTIES ('PARQUET.COMPRESSION'='SNAPPY')
"""
spark.sql(createTableSQL)
result_df.write.mode(SaveMode.Overwrite).insertInto("table_name")
给定这样的数据框:
val result = Seq(
(8, "123", 1.2, 0.5, "bat", "2019-04-04_9"),
(64, "451", 3.2, -0.5, "mouse", "2019-04-04_12"),
(-27, "613", 8.2, 1.5, "horse", "2019-04-04_10"),
(-37, "513", 4.33, 2.5, "horse", "2019-04-04_11"),
(45, "516", -3.3, 3.4, "bat", "2019-04-04_10"),
(12, "781", 1.2, 5.5, "horse", "2019-04-04_11")
我想 运行:在配置单元命令行上显示分区 "table_name" 并得到:
partition_date=2019-04-04_9
partition_date=2019-04-04_10
partition_date=2019-04-04_11
partition_date=2019-04-04_12
我的输出是:
partition_date=0
partition_date=1
在这个简单的示例中,它工作得很好,但是对于我的大数据框,我得到了以前的输出。
要更改分区数,请使用 repartition(numOfPartitions)
要在写入时更改分区依据的列,请使用 partitionBy("col")
一起使用的例子:final_df.repartition(40).write.partitionBy("txnDate").mode("append").parquet(destination)
两个有用的提示:
- 使您的重新分区大小等于工作内核的数量,以实现最快的写入/重新分区。在这个例子中,我有 10 个执行器,每个执行器有 4 个核心(总共 40 个核心)。因此,我将其设置为 40.
- 当您写入目的地时,除了子存储桶之外,不要指定任何内容——让 spark 处理索引。
- 好的目的地:
"s3a://prod/subbucket/"
- 错误的目的地:
s"s3a://prod/subbucket/txndate=$txndate"
- 好的目的地: