在另一个 pyspark 文件中使用 1 python 文件中的方法

Using the methods in 1 python file in another pyspark file

我正在尝试将下面 python 代码文件中定义的方法继承到另一个 pyspark 文件。 运行 我的代码在 Python3 环境中。

文件 >> email_notifications.py

import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
    
class notification(object):
    def __init__(self, to, subject, email_body):
        self.to         = to
        self.from_addr  = to
        self.subject    = subject
        self.email_body = email_body    

    def send_email(self):
        recipient = self.to
        sender = self.from_addr
        subject = subj
        msg = MIMEMultipart()
        msg['Subject'] = subject
        msg['From'] = sender
        msg['To'] = recipient
        msg.attach(MIMEText(self.email_body))
        session = smtplib.SMTP('localhost')
        session.sendmail(sender, recipient, msg.as_string())
        session.quit()

文件 >> data_check.py

from pyspark import SparkContext, SparkConf
import email_notifications

class data_notify(object):
    def __init__(self):
        
        self.spark = SparkSession \
                            .builder \
                            .appName ( "POC" ) \
                            .config ( "spark.serializer", "org.apache.spark.serializer.KryoSerializer" ) \
                            .config ( "spark.shuffle.compress", "true" ) \
                            .enableHiveSupport ().getOrCreate ()
                            
                            
    def main(self):
        get_object = email_notifications.notification()
        send_email = ge_object.send_email()
        email_to = 'abcd@ymail.com'
        spark = self.spark
        df = spark.sql("select * from db.spark_table")
        val = df.count()
        
        if val = 0:
            print("load is not complete")
            sub = 'Check data load'
            body = "the count of xxx table is : " + str(val) + "\nPlease validate "
            send_email(email_to,sub,body)
        else:
            print("Completed successfully")
            
if __name__ == "__main__":
    try:
        print("~~ Running validator app")
        data_notify().main()
    except Exception as e:
        print("##### Error running data_notify")
        raise Exception(e)

但是当我这样做时,我遇到了类似

的错误
__init__() takes exactly 4 arguments (1 given)

__init__() missing 3 required positional arguments: 'to' , 'subject' and 'email_body'

有人可以帮我解决这个问题吗?

提前致谢。

您需要将三个参数传递给初始化程序,因此它应该如下所示

get_object = email_notifications.notification(to, subject, email_body)

或者将 notification__init__ 函数签名更改为不带参数。

下一行也有错别字
send_email = ge_object.send_email()
应该是
send_email = get_object.send_email()