PySpark 代码结构
PySpark code structure
我正在编写 PySpark 代码,我在其中进行了 10 次查找 table,对于每次查找 table,我定义了一个结构,然后定义了一个模式。然后,我为每个查找 table 创建一个 DF,最后使用它们与一个主要 table 连接。我知道如何编码,但有人可以指导我如何构建代码吗?我是 Python 的新手,所以不知道如何在 PySpark 中组织我的代码。也许与我分享一些示例生产 PySpark 代码?谢谢!
为了恢复和管理您的代码,您可以在不同的 class 中定义不同的部分。
我的方法是创建一个 ini
或 yaml
作为参考。
此外,您还可以在测试和生产环境中处理输入变量。
例如:
主要class:
from sparkSchema import SparkSchema
from configparser import ConfigParser
if __name__ == "__main__":
config_path = './config.ini'
config = ConfigParser()
config.optionxform = str
config.read(config_path)
#initialize your app, create session and context, etc.
#also you can handle this part with some class.
streamDFWithSchema = SparkSchema(streamDF, config, 'Schema').getDFWithSchema()
# rest of the code
最好将每个输入变量(例如SparkSchema中的Schema class)定义为输入变量,可以使用argpars
library.I本例中没有使用[= =20=]
SparkSchema class:
from pyspark.sql.functions import to_timestamp
class SparkSchema:
def __init__(self, DF, config, section):
self.DF =DF
self.config = config
self.section = section
def getDFWithSchema(self):
self.DF = self.DF \
.selectExpr(
'CAST(split(KafkaValue,"'+self.config[self.section]['message.delimiter']+'")['
+self.config[self.section]['grouped.column.index']+
'] AS STRING) as '+self.config[self.section]['grouped.column.name'] \
,'CAST(split(KafkaValue,"'+self.config[self.section]['message.delimiter']+'")['
+self.config[self.section]['date.column.index']+
'] AS STRING) as '+self.config[self.section]['date.column.name'])
self.DF = self.DF\
.withColumn('EventDate',
to_timestamp(self.config[self.section]['date.column.name']
, self.config[self.section]['date.column.format']))
self.DF.printSchema()
return self.DF
.ini
文件:
.
.
.
[Schema]
message.delimiter=\\|
grouped.column.name=EmployeeId
grouped.column.index=61
date.column.name=END_DATE
date.column.index=12
date.column.format=yyyyMMddHHmmss
[SchemaLocal]
message.delimiter=\\|
grouped.column.name=EmployeeId
grouped.column.index=2
date.column.name=END_DATE
date.column.index=3
date.column.format=yyyy-MM-dd HHmmss
.
.
.
并且您应该在 spark-submit 命令中将库 configparser
添加为 --py-files
:
$ spark-submit --jars some.jar,jar.file\
--py-files configparser.zip,argpars.zip\
main_class.py
我正在编写 PySpark 代码,我在其中进行了 10 次查找 table,对于每次查找 table,我定义了一个结构,然后定义了一个模式。然后,我为每个查找 table 创建一个 DF,最后使用它们与一个主要 table 连接。我知道如何编码,但有人可以指导我如何构建代码吗?我是 Python 的新手,所以不知道如何在 PySpark 中组织我的代码。也许与我分享一些示例生产 PySpark 代码?谢谢!
为了恢复和管理您的代码,您可以在不同的 class 中定义不同的部分。
我的方法是创建一个 ini
或 yaml
作为参考。
此外,您还可以在测试和生产环境中处理输入变量。
例如:
主要class:
from sparkSchema import SparkSchema
from configparser import ConfigParser
if __name__ == "__main__":
config_path = './config.ini'
config = ConfigParser()
config.optionxform = str
config.read(config_path)
#initialize your app, create session and context, etc.
#also you can handle this part with some class.
streamDFWithSchema = SparkSchema(streamDF, config, 'Schema').getDFWithSchema()
# rest of the code
最好将每个输入变量(例如SparkSchema中的Schema class)定义为输入变量,可以使用argpars
library.I本例中没有使用[= =20=]
SparkSchema class:
from pyspark.sql.functions import to_timestamp
class SparkSchema:
def __init__(self, DF, config, section):
self.DF =DF
self.config = config
self.section = section
def getDFWithSchema(self):
self.DF = self.DF \
.selectExpr(
'CAST(split(KafkaValue,"'+self.config[self.section]['message.delimiter']+'")['
+self.config[self.section]['grouped.column.index']+
'] AS STRING) as '+self.config[self.section]['grouped.column.name'] \
,'CAST(split(KafkaValue,"'+self.config[self.section]['message.delimiter']+'")['
+self.config[self.section]['date.column.index']+
'] AS STRING) as '+self.config[self.section]['date.column.name'])
self.DF = self.DF\
.withColumn('EventDate',
to_timestamp(self.config[self.section]['date.column.name']
, self.config[self.section]['date.column.format']))
self.DF.printSchema()
return self.DF
.ini
文件:
.
.
.
[Schema]
message.delimiter=\\|
grouped.column.name=EmployeeId
grouped.column.index=61
date.column.name=END_DATE
date.column.index=12
date.column.format=yyyyMMddHHmmss
[SchemaLocal]
message.delimiter=\\|
grouped.column.name=EmployeeId
grouped.column.index=2
date.column.name=END_DATE
date.column.index=3
date.column.format=yyyy-MM-dd HHmmss
.
.
.
并且您应该在 spark-submit 命令中将库 configparser
添加为 --py-files
:
$ spark-submit --jars some.jar,jar.file\
--py-files configparser.zip,argpars.zip\
main_class.py