Pyspark:如何在 HDFS 中并行处理多 gz 文件
Pyspark: How to parallelize multi gz file processing in HDFS
我有许多 gz
文件存储在 20 个节点 HDFS
集群中,需要按列聚合。 gz
个文件非常大(每个 1GByte,总共 200 个文件)。
数据格式为key-value+2列值:['key','value1','value2']
,需要按key分组,按列聚合:sum(value1)
,count(value2)
.
数据已经按键排序,每个gz文件都有独占键值。
例如:
File 1:
k1,v1,u1
k1,v2,u1
k2,v2,u2
k3,v3,u3
k3,v4,u4
File 2:
k4,v5,u6
k4,v7,u8
k5,v9,v10
File 3:
k6,...
...
...
File 200:
k200,v200,u200
k201,v201,u201
我首先解析日期并将数据转换为(key, list of (values))
结构。解析器输出将是这样的:
parser output
(k1,[v1,u1])
(k1,[v2,u1])
(k2,[v2,u2])
(k3,[v3,u3])
(k3,[v4,u4])
然后使用reduceByKey
函数按键值分组,这比groupByKey
函数更有效。
reducer output:
(k1,[[v1,u1],[v2,u1])
(k2,[[v2,u2]])
(k3,[[v3,u3],[v4,u4]])
然后使用过程函数聚合列:
process
(k1, sum([v1,v2], len([u1,u3])))
(k2, sum([v2], len([u2])))
(k3, sum([v3,v4], len([u3,u4])))
这是该过程的示例代码
import pyspark
from pyspark import SparkFiles
def parser(line):
try:
key,val=line.split('\t)
return (key,[val1,val2])
except:
return None
def process(line):
key,gr= line[0],line[1]
vals=zip(*gr)
val1=sum(vals[0])
val2=len(vals[1])
return ('\t'.join([key,val1,val2]))
sc = pyspark.SparkContext(appName="parse")
logs=sc.textFile("hdfs:///home/user1/*.gz")
proc=logs.map(parser).filter(bool).reduceByKey(lambda acc,x: acc+x).map(process)
proc.saveAsTextFile('hdfs:///home/user1/output1')
我认为这段代码没有充分利用spark集群。我喜欢优化代码以充分利用处理考虑。
1.在 HDFS 和 Pyspark 中处理 gz 文件的最佳方式是什么? -- 如何将 gz 文件处理完全分布到整个集群?
2。如何充分利用每个节点中的所有CPU?进行聚合和解析过程
您至少应该考虑以下几点:
- 如果您使用的是 YARN,则执行器的数量以及您分配给 Spark 应用程序的每个执行器的内核数。它们可以由 --num-executors 和 --executor-cores 控制。如果您不使用 YARN,您的调度程序可能会有类似的机制来控制并行度,请尝试寻找它。
- DataFrame 中的分区数,它直接影响作业的并行度。您可以使用 repartition and/or coalesce.
来控制它
两者都可以限制作业使用的内核,从而限制集群的使用。此外,请注意,使用更多 CPU 并不一定意味着更好的性能(或执行时间)。这将取决于集群的大小和问题的大小,我不知道有什么简单的规则可以决定这一点。对我来说,通常归结为尝试不同的配置,看看哪一个具有更好的性能。
我有许多 gz
文件存储在 20 个节点 HDFS
集群中,需要按列聚合。 gz
个文件非常大(每个 1GByte,总共 200 个文件)。
数据格式为key-value+2列值:['key','value1','value2']
,需要按key分组,按列聚合:sum(value1)
,count(value2)
.
数据已经按键排序,每个gz文件都有独占键值。
例如:
File 1:
k1,v1,u1
k1,v2,u1
k2,v2,u2
k3,v3,u3
k3,v4,u4
File 2:
k4,v5,u6
k4,v7,u8
k5,v9,v10
File 3:
k6,...
...
...
File 200:
k200,v200,u200
k201,v201,u201
我首先解析日期并将数据转换为(key, list of (values))
结构。解析器输出将是这样的:
parser output
(k1,[v1,u1])
(k1,[v2,u1])
(k2,[v2,u2])
(k3,[v3,u3])
(k3,[v4,u4])
然后使用reduceByKey
函数按键值分组,这比groupByKey
函数更有效。
reducer output:
(k1,[[v1,u1],[v2,u1])
(k2,[[v2,u2]])
(k3,[[v3,u3],[v4,u4]])
然后使用过程函数聚合列:
process
(k1, sum([v1,v2], len([u1,u3])))
(k2, sum([v2], len([u2])))
(k3, sum([v3,v4], len([u3,u4])))
这是该过程的示例代码
import pyspark
from pyspark import SparkFiles
def parser(line):
try:
key,val=line.split('\t)
return (key,[val1,val2])
except:
return None
def process(line):
key,gr= line[0],line[1]
vals=zip(*gr)
val1=sum(vals[0])
val2=len(vals[1])
return ('\t'.join([key,val1,val2]))
sc = pyspark.SparkContext(appName="parse")
logs=sc.textFile("hdfs:///home/user1/*.gz")
proc=logs.map(parser).filter(bool).reduceByKey(lambda acc,x: acc+x).map(process)
proc.saveAsTextFile('hdfs:///home/user1/output1')
我认为这段代码没有充分利用spark集群。我喜欢优化代码以充分利用处理考虑。
1.在 HDFS 和 Pyspark 中处理 gz 文件的最佳方式是什么? -- 如何将 gz 文件处理完全分布到整个集群?
2。如何充分利用每个节点中的所有CPU?进行聚合和解析过程
您至少应该考虑以下几点:
- 如果您使用的是 YARN,则执行器的数量以及您分配给 Spark 应用程序的每个执行器的内核数。它们可以由 --num-executors 和 --executor-cores 控制。如果您不使用 YARN,您的调度程序可能会有类似的机制来控制并行度,请尝试寻找它。
- DataFrame 中的分区数,它直接影响作业的并行度。您可以使用 repartition and/or coalesce. 来控制它
两者都可以限制作业使用的内核,从而限制集群的使用。此外,请注意,使用更多 CPU 并不一定意味着更好的性能(或执行时间)。这将取决于集群的大小和问题的大小,我不知道有什么简单的规则可以决定这一点。对我来说,通常归结为尝试不同的配置,看看哪一个具有更好的性能。