PySpark 代码结构

PySpark code structure

我正在编写 PySpark 代码,我在其中进行了 10 次查找 table,对于每次查找 table,我定义了一个结构,然后定义了一个模式。然后,我为每个查找 table 创建一个 DF,最后使用它们与一个主要 table 连接。我知道如何编码,但有人可以指导我如何构建代码吗?我是 Python 的新手,所以不知道如何在 PySpark 中组织我的代码。也许与我分享一些示例生产 PySpark 代码?谢谢!

为了恢复和管理您的代码,您可以在不同的 class 中定义不同的部分。 我的方法是创建一个 iniyaml 作为参考。 此外,您还可以在测试和生产环境中处理输入变量。

例如:

主要class:



from sparkSchema import SparkSchema
from configparser import ConfigParser 

if __name__ == "__main__":

    config_path = './config.ini'
    config = ConfigParser()
    config.optionxform = str
    config.read(config_path)

#initialize your app, create session and context, etc. 
#also you can handle this part with some class.

streamDFWithSchema  = SparkSchema(streamDF, config, 'Schema').getDFWithSchema()

# rest of the code

最好将每个输入变量(例如SparkSchema中的Schema class)定义为输入变量,可以使用argpars library.I本例中没有使用[= =20=]

SparkSchema class:

from pyspark.sql.functions import to_timestamp

class SparkSchema:
    
    def __init__(self, DF, config, section):
        self.DF =DF
        self.config = config
        self.section = section


    def getDFWithSchema(self):
        
        self.DF  = self.DF \
      .selectExpr(
   'CAST(split(KafkaValue,"'+self.config[self.section]['message.delimiter']+'")['
   +self.config[self.section]['grouped.column.index']+
   '] AS STRING) as '+self.config[self.section]['grouped.column.name'] \
   ,'CAST(split(KafkaValue,"'+self.config[self.section]['message.delimiter']+'")['
   +self.config[self.section]['date.column.index']+
   '] AS STRING) as '+self.config[self.section]['date.column.name'])

        self.DF = self.DF\
            .withColumn('EventDate',
            to_timestamp(self.config[self.section]['date.column.name']
            , self.config[self.section]['date.column.format']))
        self.DF.printSchema()
        return self.DF

.ini 文件:

.
.
.
[Schema]
message.delimiter=\\|
grouped.column.name=EmployeeId
grouped.column.index=61
date.column.name=END_DATE
date.column.index=12
date.column.format=yyyyMMddHHmmss
[SchemaLocal]
message.delimiter=\\|
grouped.column.name=EmployeeId
grouped.column.index=2
date.column.name=END_DATE
date.column.index=3
date.column.format=yyyy-MM-dd HHmmss
.
.
.

并且您应该在 spark-submit 命令中将库 configparser 添加为 --py-files

$ spark-submit --jars some.jar,jar.file\
  --py-files configparser.zip,argpars.zip\
  main_class.py