Scala case 方法如何用于 pyspark

How Scala case method usage for pyspark

在 Scala 中,当我有一个像这样的 RDD 列表时: List(("a",1),("a",2),("b",3),("b",4),("b",5),("a",6)),我想计算每个字符的 avg 个数字。

a 一样,它显示了 3 次,值计数为 1+2+6 = 9,所以我期望的结果是 (a, 3)

在 Scala 中,我可以编写如下代码:

val newRdd = rdd.aggregateByKey((0,0))((t,v) => {(t._1 + v, t._2 + 1)}, (t1, t2) => {(t1._1 + t2._1, t1._2 + t2._2)})
val result = newRdd.mapValues{
    case(num, count) => {
        num/count
    }
}

所以结果 RDD 将 return 我预期的结果。
但是,我该如何向 pyspark 解释这个案例(num/count)?

我试过:

avg_rdd_2 = avg_rdd_1.mapValues(lambda x, y : x / y)

但是我会得到下面的错误。

21/12/24 01:27:02 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 6) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/root/PycharmProjects/pythonProject/venv/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main process() File "/root/PycharmProjects/pythonProject/venv/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 611, in process serializer.dump_stream(out_iter, outfile) File "/root/PycharmProjects/pythonProject/venv/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream vs = list(itertools.islice(iterator, batch)) File "/root/PycharmProjects/pythonProject/venv/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 74, in wrapper return f(*args, **kwargs) File "/root/PycharmProjects/pythonProject/venv/lib/python3.6/site-packages/pyspark/rdd.py", line 2278, in map_values_fn = lambda kv: (kv[0], f(kv[1])) TypeError: () missing 1 required positional argument: 'y'

假设我们有一个 RDD:

from pyspark.sql import SparkSession


spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
rdd = sc.parallelize([("a", 1), ("a", 2), ("b", 3), ("b", 4), ("b", 5), ("a", 6)])

在您的 Scala 示例中,您首先按键聚合,同样可以使用以下方法完成:

groupByKey

new_rdd = rdd.groupByKey().mapValues(lambda x: sum(x) / len(x))

print(new_rdd.collect())
# [('b', 4.0), ('a', 3.0)]                                                        

aggregateByKey

new_rdd = rdd.aggregateByKey(
    (0, 0),
    lambda x, y: (x[0] + y, x[1] + 1),
    lambda x, y: (x[0] + y[0], x[1] + y[1]),
)
result = new_rdd.mapValues(lambda x: x[0] / x[1])
print(result.collect())
# [('b', 4.0), ('a', 3.0)]                                                        

reduceByKey

result = (
    rdd.mapValues(lambda x: (x, 1))
    .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
    .mapValues(lambda x: x[0] / x[1])
)

print(result.collect())
# [('b', 4.0), ('a', 3.0)]