Pyspark:添加新列包含一个列中的值对应于另一列中满足指定条件的另一个值

Pyspark: Add new Column contain a value in a column counterpart another value in another column that meets a specified condition

添加新列包含一列中的值对应另一列中满足指定条件的另一个值 例如, 原DF如下:

    +-----+-----+-----+
    |col1 |col2 |col3 |
    +-----+-----+-----+
    |    A|   17|    1|
    |    A|   16|    2|
    |    A|   18|    2|
    |    A|   30|    3|
    |    B|   35|    1|
    |    B|   34|    2|
    |    B|   36|    2|
    |    C|   20|    1|
    |    C|   30|    1|
    |    C|   43|    1|
    +-----+-----+-----+ 

我需要为每个 col1 的组重复 col2 中与 col3 中的 1 对应的值。如果 col1 中的任何组在 col3 中有更多值 =1,则重复最小值 所需的 Df 如下:

    +----+----+----+----------+
    |col1|col2|col3|new_column|
    +----+----+----+----------+
    |   A|  17|   1|        17|
    |   A|  16|   2|        17|
    |   A|  18|   2|        17|
    |   A|  30|   3|        17|
    |   B|  35|   1|        35|
    |   B|  34|   2|        35|
    |   B|  36|   2|        35|
    |   C|  20|   1|        20|
    |   C|  30|   1|        20|
    |   C|  43|   1|        20|
    +----+----+----+----------+
df3=df.filter(df.col3==1)

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   B|  35|   1|
|   C|  20|   1|
|   C|  30|   1|
|   C|  43|   1|
|   A|  17|   1|
+----+----+----+


df3.createOrReplaceTempView("mytable")

为了获得 col2 的最小值,我遵循了这个 link

中接受的答案
df6=spark.sql("select col1, min(col2) as minimum from mytable group by col1 order by col1")

df6.show()
+----+-------+
|col1|minimum|
+----+-------+
|   A|     17|
|   B|     35|
|   C|     20|
+----+-------+

df_a=df.join(df6,['col1'],'leftouter')

+----+----+----+-------+
|col1|col2|col3|minimum|
+----+----+----+-------+
|   B|  35|   1|     35|
|   B|  34|   2|     35|
|   B|  36|   2|     35|
|   C|  20|   1|     20|
|   C|  30|   1|     20|
|   C|  43|   1|     20|
|   A|  17|   1|     17|
|   A|  16|   2|     17|
|   A|  18|   2|     17|
|   A|  30|   3|     17|
+----+----+----+-------+

还有比这个解决方案更好的方法吗?