按键值pyspark分组
group by key value pyspark
我正在尝试将值(键、值)与 apache spark (pyspark) 分组。
我设法通过键进行分组,但在内部我想对值进行分组,如下例所示。
我需要通过 cout() 对 GYEAR 列进行分组。
%pyspark
rdd1 = sc.textFile("/datos/apat63_99.txt")
rdd2 = rdd1.map(lambda line : line.split(",") ).map(lambda l : (l[4],l[1],l[0]))
for line in rdd2.take(6):
print(line)
######################
rdd3 = rdd2.map(lambda line:(line[0],(line[1:]) ))
rddx = rdd2.groupByKey()
rddx.take(5)
我希望输出为:
输入:
(u'"COUNTRY"', u'"GYEAR"', u'"PATENT"')
(u'"BE"', u'1963', u'3070801')
(u'"BE"', u'1964', u'3070811')
(u'"US"', u'1963', u'3070802')
(u'"US"', u'1963', u'3070803')
(u'"US"', u'1963', u'3070804')
(u'"US"', u'1963', u'3070805')
(u'"US"', u'1964', u'3070807')
输出:
(u'"BE"', [(u'1963', 1), (u'1964', 1)])
(u'"US"', [(u'1963', 4), (u'1964', 2)])
是您要查找的内容吗?我无法将结果列转换为元组,因此只能连接为字符串。
以下解决方案的性能可能不佳。
运行 火花 2.3 Ubuntu 18.04
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.appName("test").enableHiveSupport().getOrCreate()
l = [
('BE', '1963', '3070801'),
('BE', '1964', '3070811'),
('S', '1963', '3070802'),
('S', '1963', '3070803'),
('S', '1963', '3070804'),
('S', '1963', '3070805'),
('S', '1964', '3070807')]
colmns = ['country', 'Gyear', 'Patient']
df=spark.createDataFrame(l, colmns)
df.show()
+-------+-----+-------+
|country|Gyear|Patient|
+-------+-----+-------+
| BE| 1963|3070801|
| BE| 1964|3070811|
| S| 1963|3070802|
| S| 1963|3070803|
| S| 1963|3070804|
| S| 1963|3070805|
| S| 1964|3070807|
+-------+-----+-------+
df1=df.groupBy("country","Gyear").agg(F.count("Patient").alias("Patient"))
df1.show()
+-------+-----+-------+
|country|Gyear|Patient|
+-------+-----+-------+
| S| 1963| 4|
| BE| 1963| 1|
| S| 1964| 1|
| BE| 1964| 1|
+-------+-----+-------+
df2=df1.withColumn('result',F.concat(F.lit('('),df1.Gyear,F.lit(','),df1.Patient,F.lit(')'))).drop("Gyear","Patient")
df2.show()
+-------+--------+
|country| result|
+-------+--------+
| S|(1963,4)|
| BE|(1963,1)|
| S|(1964,1)|
| BE|(1964,1)|
+-------+--------+
df2.groupBy("country").agg(F.collect_list("result")).show()
+-------+--------------------+
|country|collect_list(result)|
+-------+--------------------+
| S|[(1963,4), (1964,1)]|
| BE|[(1963,1), (1964,1)]|
+-------+--------------------+
正如@PIG 指出的那样,使用 DataFrame 比使用 RDD 更容易。
此外,我建议使用 create_map
、collect_list
和您自己的 UDF 到 combine_maps
。这应该允许您继续使用结构化数据。
df2=df1.withColumn('result',F.create_map(df1.Gyear, df1.Patient))
df2.show()
+-------+-----+-------+-----------+
|country|Gyear|Patient| result|
+-------+-----+-------+-----------+
| S| 1963| 4|[1963 -> 4]|
| BE| 1963| 1|[1963 -> 1]|
| S| 1964| 1|[1964 -> 1]|
| BE| 1964| 1|[1964 -> 1]|
+-------+-----+-------+-----------+
from typing import List, Dict
from pyspark.sql.functions import udf
from functools import reduce
from pyspark.sql.types import *
def combine_map(x: Dict[str, int], y: Dict[str, int]) -> Dict[str, int]:
return {k: x.get(k, 0) + y.get(k, 0) for k in set(x) | set(y)}
@udf(returnType=MapType(StringType(), IntegerType()))
def combine_maps(maps):
return reduce(combine_map, maps, {})
df2.groupBy("country").agg(F.collect_list("result").alias("result")) \
.withColumn("result", combine_maps("result")) \
.show(truncate=False)
+-------+----------------------+
|country|result |
+-------+----------------------+
|S |[1964 -> 1, 1963 -> 4]|
|BE |[1964 -> 1, 1963 -> 1]|
+-------+----------------------+
这是使用 RDD 方法的一种方式:
from operator import add
# initialize the RDD
rdd = sc.parallelize([(u'"COUNTRY"', u'"GYEAR"', u'"PATENT"')
, (u'"BE"', u'1963', u'3070801')
, (u'"BE"', u'1964', u'3070811')
, (u'"US"', u'1963', u'3070802')
, (u'"US"', u'1963', u'3070803')
, (u'"US"', u'1963', u'3070804')
, (u'"US"', u'1963', u'3070805')
, (u'"US"', u'1964', u'3070807')])
执行以下操作:
- 设置
(COUNTRY, GYEAR)
的元组为键,1
为值
- 使用 reduceByKey(add) 计算键数
- 调整key为
COUNTRY
,value为[(GYEAR, cnt)]
where cnt是由前面的reduceByKey 计算出来的
- 运行
reduceByKey(add)
用相同的键(COUNTRY
)合并列表。
使用过滤器删除 header
rdd_new = rdd.map(lambda x: ((x[0],x[1]), 1) ) \
.reduceByKey(add) \
.map(lambda x: (x[0][0], [(x[0][1],x[1])])) \
.reduceByKey(add) \
.filter(lambda x: x[0] != '"COUNTRY"')
查看结果:
>>> rdd_new.take(2)
[(u'"US"', [(u'1964', 1), (u'1963', 4)]),
(u'"BE"', [(u'1963', 1), (u'1964', 1)])]
我正在尝试将值(键、值)与 apache spark (pyspark) 分组。 我设法通过键进行分组,但在内部我想对值进行分组,如下例所示。
我需要通过 cout() 对 GYEAR 列进行分组。
%pyspark
rdd1 = sc.textFile("/datos/apat63_99.txt")
rdd2 = rdd1.map(lambda line : line.split(",") ).map(lambda l : (l[4],l[1],l[0]))
for line in rdd2.take(6):
print(line)
######################
rdd3 = rdd2.map(lambda line:(line[0],(line[1:]) ))
rddx = rdd2.groupByKey()
rddx.take(5)
我希望输出为:
输入:
(u'"COUNTRY"', u'"GYEAR"', u'"PATENT"')
(u'"BE"', u'1963', u'3070801')
(u'"BE"', u'1964', u'3070811')
(u'"US"', u'1963', u'3070802')
(u'"US"', u'1963', u'3070803')
(u'"US"', u'1963', u'3070804')
(u'"US"', u'1963', u'3070805')
(u'"US"', u'1964', u'3070807')
输出:
(u'"BE"', [(u'1963', 1), (u'1964', 1)])
(u'"US"', [(u'1963', 4), (u'1964', 2)])
是您要查找的内容吗?我无法将结果列转换为元组,因此只能连接为字符串。 以下解决方案的性能可能不佳。
运行 火花 2.3 Ubuntu 18.04
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.appName("test").enableHiveSupport().getOrCreate()
l = [
('BE', '1963', '3070801'),
('BE', '1964', '3070811'),
('S', '1963', '3070802'),
('S', '1963', '3070803'),
('S', '1963', '3070804'),
('S', '1963', '3070805'),
('S', '1964', '3070807')]
colmns = ['country', 'Gyear', 'Patient']
df=spark.createDataFrame(l, colmns)
df.show()
+-------+-----+-------+
|country|Gyear|Patient|
+-------+-----+-------+
| BE| 1963|3070801|
| BE| 1964|3070811|
| S| 1963|3070802|
| S| 1963|3070803|
| S| 1963|3070804|
| S| 1963|3070805|
| S| 1964|3070807|
+-------+-----+-------+
df1=df.groupBy("country","Gyear").agg(F.count("Patient").alias("Patient"))
df1.show()
+-------+-----+-------+
|country|Gyear|Patient|
+-------+-----+-------+
| S| 1963| 4|
| BE| 1963| 1|
| S| 1964| 1|
| BE| 1964| 1|
+-------+-----+-------+
df2=df1.withColumn('result',F.concat(F.lit('('),df1.Gyear,F.lit(','),df1.Patient,F.lit(')'))).drop("Gyear","Patient")
df2.show()
+-------+--------+
|country| result|
+-------+--------+
| S|(1963,4)|
| BE|(1963,1)|
| S|(1964,1)|
| BE|(1964,1)|
+-------+--------+
df2.groupBy("country").agg(F.collect_list("result")).show()
+-------+--------------------+
|country|collect_list(result)|
+-------+--------------------+
| S|[(1963,4), (1964,1)]|
| BE|[(1963,1), (1964,1)]|
+-------+--------------------+
正如@PIG 指出的那样,使用 DataFrame 比使用 RDD 更容易。
此外,我建议使用 create_map
、collect_list
和您自己的 UDF 到 combine_maps
。这应该允许您继续使用结构化数据。
df2=df1.withColumn('result',F.create_map(df1.Gyear, df1.Patient))
df2.show()
+-------+-----+-------+-----------+
|country|Gyear|Patient| result|
+-------+-----+-------+-----------+
| S| 1963| 4|[1963 -> 4]|
| BE| 1963| 1|[1963 -> 1]|
| S| 1964| 1|[1964 -> 1]|
| BE| 1964| 1|[1964 -> 1]|
+-------+-----+-------+-----------+
from typing import List, Dict
from pyspark.sql.functions import udf
from functools import reduce
from pyspark.sql.types import *
def combine_map(x: Dict[str, int], y: Dict[str, int]) -> Dict[str, int]:
return {k: x.get(k, 0) + y.get(k, 0) for k in set(x) | set(y)}
@udf(returnType=MapType(StringType(), IntegerType()))
def combine_maps(maps):
return reduce(combine_map, maps, {})
df2.groupBy("country").agg(F.collect_list("result").alias("result")) \
.withColumn("result", combine_maps("result")) \
.show(truncate=False)
+-------+----------------------+
|country|result |
+-------+----------------------+
|S |[1964 -> 1, 1963 -> 4]|
|BE |[1964 -> 1, 1963 -> 1]|
+-------+----------------------+
这是使用 RDD 方法的一种方式:
from operator import add
# initialize the RDD
rdd = sc.parallelize([(u'"COUNTRY"', u'"GYEAR"', u'"PATENT"')
, (u'"BE"', u'1963', u'3070801')
, (u'"BE"', u'1964', u'3070811')
, (u'"US"', u'1963', u'3070802')
, (u'"US"', u'1963', u'3070803')
, (u'"US"', u'1963', u'3070804')
, (u'"US"', u'1963', u'3070805')
, (u'"US"', u'1964', u'3070807')])
执行以下操作:
- 设置
(COUNTRY, GYEAR)
的元组为键,1
为值 - 使用 reduceByKey(add) 计算键数
- 调整key为
COUNTRY
,value为[(GYEAR, cnt)]
where cnt是由前面的reduceByKey 计算出来的
- 运行
reduceByKey(add)
用相同的键(COUNTRY
)合并列表。 使用过滤器删除 header
rdd_new = rdd.map(lambda x: ((x[0],x[1]), 1) ) \ .reduceByKey(add) \ .map(lambda x: (x[0][0], [(x[0][1],x[1])])) \ .reduceByKey(add) \ .filter(lambda x: x[0] != '"COUNTRY"')
查看结果:
>>> rdd_new.take(2)
[(u'"US"', [(u'1964', 1), (u'1963', 4)]),
(u'"BE"', [(u'1963', 1), (u'1964', 1)])]