Udf 不工作

Udf not working

你能帮我优化这段代码并让它工作吗? 这是原始数据:

+--------------------+-------------+
|       original_name|medicine_name|
+--------------------+-------------+
|         Venlafaxine|  Venlafaxine|
|    Lacrifilm 5mg/ml|    Lacrifilm|
|    Lacrifilm 5mg/ml|         null|
|         Venlafaxine|         null|
|Vitamin D10,000IU...|         null|
|         paracetamol|         null|
|            mucolite|         null|

我希望得到这样的数据

+--------------------+-------------+
|       original_name|medicine_name|
+--------------------+-------------+
|         Venlafaxine|  Venlafaxine|
|    Lacrifilm 5mg/ml|    Lacrifilm|
|    Lacrifilm 5mg/ml|    Lacrifilm|
|         Venlafaxine|  Venlafaxine|
|Vitamin D10,000IU...|         null|
|         paracetamol|         null|
|            mucolite|         null|

这是代码:

distinct_df = spark.sql("select distinct medicine_name as medicine_name from medicine where medicine_name is not null")
distinct_df.createOrReplaceTempView("distinctDF")

def getMax(num1, num2):
    pmax = (num1>=num2)*num1+(num2>num1)*num2
    return pmax

def editDistance(s1, s2):
    ed = (getMax(length(s1), length(s2)) - levenshtein(s1,s2))/
          getMax(length(s1), length(s2))
    return ed

editDistanceUdf = udf(lambda x,y: editDistance(x,y), FloatType())

def getSimilarity(str):
    res = spark.sql("select medicine_name, editDistanceUdf('str', medicine_name) from distinctDf where editDistanceUdf('str', medicine_name)>=0.85 order by 2")
    res['medicine_name'].take(1)
    return res

getSimilarityUdf = udf(lambda x: getSimilarity(x), StringType())
res_df = df.withColumn('m_name', when((df.medicine_name.isNull)|(df.medicine_name.=="null")),getSimilarityUdf(df.original_name)
.otherwise(df.medicine_name)).show()

现在出现错误:

command_part = REFERENCE_TYPE + parameter._get_object_id() AttributeError: 'function' object has no attribute '_get_object_id'

你的代码有一堆问题:

  • 您不能在 udf 中使用 SparkSession 或分布式对象。所以 getSimilarity 根本行不通。如果你想像这样比较对象,你必须 join.
  • 如果 lengthlevenshtein 来自 pyspark.sql.functions,则不能在 UserDefinedFunctions 中使用。有专门用来生成SQL表达式,从*ColumnColumn的映射。
  • isNull 不是 property 的方法,因此应称为:

    df.medicine_name.isNull()
    
  • 正在关注

    df.medicine_name.=="null"
    

    不是语法上有效的 Python(看起来像 Scala calque)并且会抛出编译器异常。

  • 如果在 UserDefinedFunction 中允许 SparkSession 访问,这将不是有效的替换

    spark.sql("select medicine_name, editDistanceUdf('str', medicine_name) from distinctDf where editDistanceUdf('str', medicine_name)>=0.85 order by 2")
    

    你应该使用字符串格式化方法

    spark.sql("select medicine_name, editDistanceUdf({str}, medicine_name) from distinctDf where editDistanceUdf({str}, medicine_name)>=0.85 order by 2".format(str=str))
    
  • 可能还有其他问题,但由于您没有提供MCVE,所以其他一切都是猜测。

当你改正较小的错误时,你有两个选择:

  • 使用crossJoin:

    combined = df.alias("left").crossJoin(spark.table("distinctDf").alias("right"))
    

    然后应用 udf、过滤器和 中列出的方法之一来最接近组中的匹配项。

  • 使用内置的近似匹配工具,如 Efficient string matching in Apache Spark

  • 中所述