Scala case 方法如何用于 pyspark
How Scala case method usage for pyspark
在 Scala 中,当我有一个像这样的 RDD 列表时:
List(("a",1),("a",2),("b",3),("b",4),("b",5),("a",6))
,我想计算每个字符的 avg
个数字。
与 a
一样,它显示了 3 次,值计数为 1+2+6 = 9,所以我期望的结果是 (a, 3)
。
在 Scala 中,我可以编写如下代码:
val newRdd = rdd.aggregateByKey((0,0))((t,v) => {(t._1 + v, t._2 + 1)}, (t1, t2) => {(t1._1 + t2._1, t1._2 + t2._2)})
val result = newRdd.mapValues{
case(num, count) => {
num/count
}
}
所以结果 RDD 将 return 我预期的结果。
但是,我该如何向 pyspark 解释这个案例(num/count)?
我试过:
avg_rdd_2 = avg_rdd_1.mapValues(lambda x, y : x / y)
但是我会得到下面的错误。
21/12/24 01:27:02 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 6)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/root/PycharmProjects/pythonProject/venv/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main
process()
File "/root/PycharmProjects/pythonProject/venv/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 611, in process
serializer.dump_stream(out_iter, outfile)
File "/root/PycharmProjects/pythonProject/venv/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/root/PycharmProjects/pythonProject/venv/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 74, in wrapper
return f(*args, **kwargs)
File "/root/PycharmProjects/pythonProject/venv/lib/python3.6/site-packages/pyspark/rdd.py", line 2278, in
map_values_fn = lambda kv: (kv[0], f(kv[1]))
TypeError: () missing 1 required positional argument: 'y'
假设我们有一个 RDD:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
rdd = sc.parallelize([("a", 1), ("a", 2), ("b", 3), ("b", 4), ("b", 5), ("a", 6)])
在您的 Scala 示例中,您首先按键聚合,同样可以使用以下方法完成:
groupByKey
new_rdd = rdd.groupByKey().mapValues(lambda x: sum(x) / len(x))
print(new_rdd.collect())
# [('b', 4.0), ('a', 3.0)]
aggregateByKey
new_rdd = rdd.aggregateByKey(
(0, 0),
lambda x, y: (x[0] + y, x[1] + 1),
lambda x, y: (x[0] + y[0], x[1] + y[1]),
)
result = new_rdd.mapValues(lambda x: x[0] / x[1])
print(result.collect())
# [('b', 4.0), ('a', 3.0)]
reduceByKey
result = (
rdd.mapValues(lambda x: (x, 1))
.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
.mapValues(lambda x: x[0] / x[1])
)
print(result.collect())
# [('b', 4.0), ('a', 3.0)]
在 Scala 中,当我有一个像这样的 RDD 列表时:
List(("a",1),("a",2),("b",3),("b",4),("b",5),("a",6))
,我想计算每个字符的 avg
个数字。
与 a
一样,它显示了 3 次,值计数为 1+2+6 = 9,所以我期望的结果是 (a, 3)
。
在 Scala 中,我可以编写如下代码:
val newRdd = rdd.aggregateByKey((0,0))((t,v) => {(t._1 + v, t._2 + 1)}, (t1, t2) => {(t1._1 + t2._1, t1._2 + t2._2)})
val result = newRdd.mapValues{
case(num, count) => {
num/count
}
}
所以结果 RDD 将 return 我预期的结果。
但是,我该如何向 pyspark 解释这个案例(num/count)?
我试过:
avg_rdd_2 = avg_rdd_1.mapValues(lambda x, y : x / y)
但是我会得到下面的错误。
21/12/24 01:27:02 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 6) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/root/PycharmProjects/pythonProject/venv/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main process() File "/root/PycharmProjects/pythonProject/venv/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 611, in process serializer.dump_stream(out_iter, outfile) File "/root/PycharmProjects/pythonProject/venv/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream vs = list(itertools.islice(iterator, batch)) File "/root/PycharmProjects/pythonProject/venv/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 74, in wrapper return f(*args, **kwargs) File "/root/PycharmProjects/pythonProject/venv/lib/python3.6/site-packages/pyspark/rdd.py", line 2278, in map_values_fn = lambda kv: (kv[0], f(kv[1])) TypeError: () missing 1 required positional argument: 'y'
假设我们有一个 RDD:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
rdd = sc.parallelize([("a", 1), ("a", 2), ("b", 3), ("b", 4), ("b", 5), ("a", 6)])
在您的 Scala 示例中,您首先按键聚合,同样可以使用以下方法完成:
groupByKey
new_rdd = rdd.groupByKey().mapValues(lambda x: sum(x) / len(x))
print(new_rdd.collect())
# [('b', 4.0), ('a', 3.0)]
aggregateByKey
new_rdd = rdd.aggregateByKey(
(0, 0),
lambda x, y: (x[0] + y, x[1] + 1),
lambda x, y: (x[0] + y[0], x[1] + y[1]),
)
result = new_rdd.mapValues(lambda x: x[0] / x[1])
print(result.collect())
# [('b', 4.0), ('a', 3.0)]
reduceByKey
result = (
rdd.mapValues(lambda x: (x, 1))
.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
.mapValues(lambda x: x[0] / x[1])
)
print(result.collect())
# [('b', 4.0), ('a', 3.0)]