如何加速 pyspark 的计算
how to accelerate compute for pyspark
源数据是来自设备的事件日志,所有数据都是json格式,
原始 json 数据样本
{"sn": "123", "ip": null, "evt_name": "client_requestData", "evt_content": {"count": 1, "hour": 13, "dow": 0, "segmentation": {"requestService": "music", "requestData": "is_online", "requestOpcode": "get_state"}, "sum": 0}, "evt_ts": 1521350052, "app_key": "f6e7f4f8ec4b4d6dae6fa2b5ed8f90cb6a640759", "sdk_name": "countlysdk_0.0.9", "sdk_version": "17.05"}
{"sn": "123", "ip": null, "evt_name": "client_requestData2", "evt_content": {"count": 1, "hour": 13, "dow": 0, "segmentation": {"requestService": "fm", "requestData": "is_online", "requestOpcode": "get_state"}, "sum": 0}, "evt_ts": 1521350053, "app_key": "f6e7f4f8ec4b4d6dae6fa2b5ed8f90cb6a640759", "sdk_name": "countlysdk_0.0.9", "sdk_version": "17.05"}
{"sn": "123", "ip": null, "evt_name": "client_requestData3", "evt_content": {"count": 1, "hour": 13, "dow": 0, "segmentation": {"requestService": "video", "requestData": "is_online", "requestOpcode": "get_state"}, "sum": 0}, "evt_ts": 1521350054, "app_key": "f6e7f4f8ec4b4d6dae6fa2b5ed8f90cb6a640759", "sdk_name": "countlysdk_0.0.9", "sdk_version": "17.05"}
{"sn": "123", "ip": null, "evt_name": "client_requestData4", "evt_content": {"count": 1, "hour": 13, "dow": 0, "segmentation": {"requestService": "fm", "requestData": "is_online", "requestOpcode": "get_state"}, "sum": 0}
我有一个事件列表,例如:tar_task_list,大约 100 项和更多项,并且对于每个事件
我需要从原始数据中聚合所有事件,然后将其保存到事件 csv 文件
代码如下
#read source data
raw_data = sc.textFile("s3://xxx").map(lambda x:json.loads(x))
# TODO: NEED TO SPEED UP THIS COMPUTING
for tar_evt_name in evts:
print("...")
table_name = out_table_prefix + tar_evt_name
evt_one_rdd = raw_data.filter(lambda x: x.get("evt_name") == tar_evt_name)
evt_one_rdd.cache()
evt_one_dict = evt_one_rdd.first()
Evt_one = Row(*sorted(['{}'.format(k) for k, v in evt_one_dict.items()]))
col_len = len(evt_one_rdd.first())
evt_one_rdd2 = evt_one_rdd.map(lambda x: to_list(x, col_len)).filter(lambda x: len(x) is not 0)
evt_one_rdd2.cache()
df = spark.createDataFrame(evt_one_rdd2.map(lambda x: Evt_one(*x)))
out_csv_path = output + '/' + tar_evt_name+'/'# add last '/' for copy err
df.write.csv(out_csv_path, mode='overwrite', header=True,sep='|',nullValue="NULL")
输出数据如下:
时间:2018-05-07 00:03|8dab4796-fa37-4114-0011-7637fa2b0001|f6e7f4f8ec4b4d6dae6fa2b5ed8f90cb6a640759|0.2.23|131074|2018-05-08 [=21|default|0|false 130
这是我的尝试,
我注意到这里有几个问题,
for tar_evt_name in evts
是一个原生的 Python for 循环,当你想按操作进行分组时会导致性能下降;
.cache()
是用的,貌似没啥用;
- 不确定
to_list
是什么;
认为 evt_one_rdd2.map(lambda x: Evt_one(*x)))
行不通;
import json
from pyspark.sql import functions as F
from pyspark.sql import Row
from pyspark.sql import Window
raw_data = sc.textFile('test.txt')
df = raw_data.map(
# Map the raw input to python dict using `json.loads`
json.loads,
).map(
# Duplicate the evt_name and evt_ts for later use in a Row object
lambda x: Row(evt_name=x['evt_name'], evt_ts=x.get('evt_ts', 1), data=x),
).toDF() # Convert into a dataframe...
# ... (I am actually unusre if this is faster...
# ... but I am more comfortable with this)
filtered_df = df.withColumn(
# NOTE: Assumed you want the first row, as you used `evt_one_rdd.first()`.
# So we assign a row number (named rn) and then filter on rn = 1.
# Here the evt_name and evt_ts becomes handy, you might want to set
# your own evt_ts properly.
'rn', F.row_number().over(
Window.partitionBy(df['evt_name']).orderBy(df['evt_ts'])
),
).filter('rn = 1').where(
# NOTE: Since you used `map(lambda x: to_list(x, col_len)).filter(lambda x: len(x) is not 0)`,
# I assume you meant data should have more than 0 keys,
# but this should be almost always true?
# Since you are grouping by `evt_name`, which means
# there is at least that key most of the time.
F.size(F.col('data')) > 0
)
filtered_df.write(....)
源数据是来自设备的事件日志,所有数据都是json格式, 原始 json 数据样本
{"sn": "123", "ip": null, "evt_name": "client_requestData", "evt_content": {"count": 1, "hour": 13, "dow": 0, "segmentation": {"requestService": "music", "requestData": "is_online", "requestOpcode": "get_state"}, "sum": 0}, "evt_ts": 1521350052, "app_key": "f6e7f4f8ec4b4d6dae6fa2b5ed8f90cb6a640759", "sdk_name": "countlysdk_0.0.9", "sdk_version": "17.05"}
{"sn": "123", "ip": null, "evt_name": "client_requestData2", "evt_content": {"count": 1, "hour": 13, "dow": 0, "segmentation": {"requestService": "fm", "requestData": "is_online", "requestOpcode": "get_state"}, "sum": 0}, "evt_ts": 1521350053, "app_key": "f6e7f4f8ec4b4d6dae6fa2b5ed8f90cb6a640759", "sdk_name": "countlysdk_0.0.9", "sdk_version": "17.05"}
{"sn": "123", "ip": null, "evt_name": "client_requestData3", "evt_content": {"count": 1, "hour": 13, "dow": 0, "segmentation": {"requestService": "video", "requestData": "is_online", "requestOpcode": "get_state"}, "sum": 0}, "evt_ts": 1521350054, "app_key": "f6e7f4f8ec4b4d6dae6fa2b5ed8f90cb6a640759", "sdk_name": "countlysdk_0.0.9", "sdk_version": "17.05"}
{"sn": "123", "ip": null, "evt_name": "client_requestData4", "evt_content": {"count": 1, "hour": 13, "dow": 0, "segmentation": {"requestService": "fm", "requestData": "is_online", "requestOpcode": "get_state"}, "sum": 0}
我有一个事件列表,例如:tar_task_list,大约 100 项和更多项,并且对于每个事件 我需要从原始数据中聚合所有事件,然后将其保存到事件 csv 文件
代码如下
#read source data
raw_data = sc.textFile("s3://xxx").map(lambda x:json.loads(x))
# TODO: NEED TO SPEED UP THIS COMPUTING
for tar_evt_name in evts:
print("...")
table_name = out_table_prefix + tar_evt_name
evt_one_rdd = raw_data.filter(lambda x: x.get("evt_name") == tar_evt_name)
evt_one_rdd.cache()
evt_one_dict = evt_one_rdd.first()
Evt_one = Row(*sorted(['{}'.format(k) for k, v in evt_one_dict.items()]))
col_len = len(evt_one_rdd.first())
evt_one_rdd2 = evt_one_rdd.map(lambda x: to_list(x, col_len)).filter(lambda x: len(x) is not 0)
evt_one_rdd2.cache()
df = spark.createDataFrame(evt_one_rdd2.map(lambda x: Evt_one(*x)))
out_csv_path = output + '/' + tar_evt_name+'/'# add last '/' for copy err
df.write.csv(out_csv_path, mode='overwrite', header=True,sep='|',nullValue="NULL")
输出数据如下: 时间:2018-05-07 00:03|8dab4796-fa37-4114-0011-7637fa2b0001|f6e7f4f8ec4b4d6dae6fa2b5ed8f90cb6a640759|0.2.23|131074|2018-05-08 [=21|default|0|false 130
这是我的尝试,
我注意到这里有几个问题,
for tar_evt_name in evts
是一个原生的 Python for 循环,当你想按操作进行分组时会导致性能下降;.cache()
是用的,貌似没啥用;- 不确定
to_list
是什么; 认为
evt_one_rdd2.map(lambda x: Evt_one(*x)))
行不通;import json from pyspark.sql import functions as F from pyspark.sql import Row from pyspark.sql import Window raw_data = sc.textFile('test.txt') df = raw_data.map( # Map the raw input to python dict using `json.loads` json.loads, ).map( # Duplicate the evt_name and evt_ts for later use in a Row object lambda x: Row(evt_name=x['evt_name'], evt_ts=x.get('evt_ts', 1), data=x), ).toDF() # Convert into a dataframe... # ... (I am actually unusre if this is faster... # ... but I am more comfortable with this) filtered_df = df.withColumn( # NOTE: Assumed you want the first row, as you used `evt_one_rdd.first()`. # So we assign a row number (named rn) and then filter on rn = 1. # Here the evt_name and evt_ts becomes handy, you might want to set # your own evt_ts properly. 'rn', F.row_number().over( Window.partitionBy(df['evt_name']).orderBy(df['evt_ts']) ), ).filter('rn = 1').where( # NOTE: Since you used `map(lambda x: to_list(x, col_len)).filter(lambda x: len(x) is not 0)`, # I assume you meant data should have more than 0 keys, # but this should be almost always true? # Since you are grouping by `evt_name`, which means # there is at least that key most of the time. F.size(F.col('data')) > 0 ) filtered_df.write(....)