pyspark 数据帧上的矢量运算
Vector operation on pyspark dataframe
我是 pyspark 的新手。
我的数据框:
df = spark.createDataFrame([[10, 8], [3, 5], [1, 3], [1, 5], [2, 8], [8, 7]], list('AB'))
df.show()
+---+---+
| A| B|
+---+---+
| 10| 8|
| 3| 5|
| 1| 3|
| 1| 5|
| 2| 8|
| 8| 7|
+---+---+
通过 VectorAssembler
:
将 col'A' & col'B' 转换为向量
from pyspark.ml.feature import VectorAssembler,Normalizer
Vector = VectorAssembler(inputCols=['A','B'], outputCol="Vector_AB").transform(df)
单位 vector_AB 由 Normalizer
:
Vector = Normalizer(inputCol="Vector_AB",outputCol="Unit_AB",p=2).transform(Vector)
+---+---+----------+--------------------+
| A| B| Vector_AB| Unit_AB|
+---+---+----------+--------------------+
| 10| 8|[10.0,8.0]|[0.78086880944303...|
| 3| 5| [3.0,5.0]|[0.51449575542752...|
| 1| 3| [1.0,3.0]|[0.31622776601683...|
| 1| 5| [1.0,5.0]|[0.19611613513818...|
| 2| 8| [2.0,8.0]|[0.24253562503633...|
| 8| 7| [8.0,7.0]|[0.75257669470687...|
+---+---+----------+--------------------+
如何计算Vector_AB的内积? (2 范数)
喜欢,
inputCol: 'Vector_AB'
-->[10.0,8.0]
, 得到outputCol: Inner_Product_AB
-->(10^2+8^2) = 164
我尝试:
Vector = Vector.withColumn('Inner_Product_AB', Vector['A']*Vector['A']+Vector['B']*Vector['B'])
是否有任何内置函数可以得到这个结果?
我想要的数据框:
+---+---+----------+--------------------+----------------+
| A| B| Vector_AB| Norm_AB|Inner_Product_AB|
+---+---+----------+--------------------+----------------+
| 10| 8|[10.0,8.0]|[0.78086880944303...| 164|
| 3| 5| [3.0,5.0]|[0.51449575542752...| 34|
| 1| 3| [1.0,3.0]|[0.31622776601683...| 10|
| 1| 5| [1.0,5.0]|[0.19611613513818...| 26|
| 2| 8| [2.0,8.0]|[0.24253562503633...| 68|
| 8| 7| [8.0,7.0]|[0.75257669470687...| 113|
+---+---+----------+--------------------+----------------+
那我要做向量运算:col['Norm_AB']/col['Inner_Product_AB']
是否有任何内置函数可以执行此操作?
How to calculate the inner product of Vector_AB? (2 norm)
一种方法是使用内置函数 dot
定义一个在 pyspark.ml.linalg.DenseVector
对象上运行的 UDF,即内积:
dot_prod_udf = F.udf(lambda v: int(v.dot(v)), LongType())
示例:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import VectorUDT
from pyspark.sql.types import FloatType
data = [
{"A": 10, "B": 8},
{"A": 3, "B": 5},
{"A": 1, "B": 3},
{"A": 1, "B": 5},
{"A": 2, "B": 8},
{"A": 8, "B": 7},
]
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(data)
Vector = VectorAssembler(inputCols=["A", "B"], outputCol="Vector_AB").transform(df)
dot_prod_udf = F.udf(lambda v: float(v.dot(v)), FloatType())
norm_udf = F.udf(lambda x, y: x / y, VectorUDT())
Vector = Vector.withColumn("Inner_Product_AB", dot_prod_udf("Vector_AB"))
Vector = Vector.withColumn("Inner_Product_AB_sqrt", F.sqrt("Inner_Product_AB"))
Vector = Vector.withColumn("Norm_AB", norm_udf("Vector_AB", "Inner_Product_AB_sqrt"))
结果:
+---+---+----------+----------------+---------------------+----------------------------------------+
|A |B |Vector_AB |Inner_Product_AB|Inner_Product_AB_sqrt|Norm_AB |
+---+---+----------+----------------+---------------------+----------------------------------------+
|10 |8 |[10.0,8.0]|164.0 |12.806248474865697 |[0.7808688094430304,0.6246950475544243] |
|3 |5 |[3.0,5.0] |34.0 |5.830951894845301 |[0.5144957554275265,0.8574929257125441] |
|1 |3 |[1.0,3.0] |10.0 |3.1622776601683795 |[0.31622776601683794,0.9486832980505138]|
|1 |5 |[1.0,5.0] |26.0 |5.0990195135927845 |[0.19611613513818404,0.9805806756909202]|
|2 |8 |[2.0,8.0] |68.0 |8.246211251235321 |[0.24253562503633297,0.9701425001453319]|
|8 |7 |[8.0,7.0] |113.0 |10.63014581273465 |[0.7525766947068778,0.658504607868518] |
+---+---+----------+----------------+---------------------+----------------------------------------+
我是 pyspark 的新手。
我的数据框:
df = spark.createDataFrame([[10, 8], [3, 5], [1, 3], [1, 5], [2, 8], [8, 7]], list('AB'))
df.show()
+---+---+
| A| B|
+---+---+
| 10| 8|
| 3| 5|
| 1| 3|
| 1| 5|
| 2| 8|
| 8| 7|
+---+---+
通过 VectorAssembler
:
from pyspark.ml.feature import VectorAssembler,Normalizer
Vector = VectorAssembler(inputCols=['A','B'], outputCol="Vector_AB").transform(df)
单位 vector_AB 由 Normalizer
:
Vector = Normalizer(inputCol="Vector_AB",outputCol="Unit_AB",p=2).transform(Vector)
+---+---+----------+--------------------+
| A| B| Vector_AB| Unit_AB|
+---+---+----------+--------------------+
| 10| 8|[10.0,8.0]|[0.78086880944303...|
| 3| 5| [3.0,5.0]|[0.51449575542752...|
| 1| 3| [1.0,3.0]|[0.31622776601683...|
| 1| 5| [1.0,5.0]|[0.19611613513818...|
| 2| 8| [2.0,8.0]|[0.24253562503633...|
| 8| 7| [8.0,7.0]|[0.75257669470687...|
+---+---+----------+--------------------+
如何计算Vector_AB的内积? (2 范数)
喜欢,
inputCol: 'Vector_AB'
-->[10.0,8.0]
, 得到outputCol: Inner_Product_AB
-->(10^2+8^2) = 164
我尝试:
Vector = Vector.withColumn('Inner_Product_AB', Vector['A']*Vector['A']+Vector['B']*Vector['B'])
是否有任何内置函数可以得到这个结果?
我想要的数据框:
+---+---+----------+--------------------+----------------+
| A| B| Vector_AB| Norm_AB|Inner_Product_AB|
+---+---+----------+--------------------+----------------+
| 10| 8|[10.0,8.0]|[0.78086880944303...| 164|
| 3| 5| [3.0,5.0]|[0.51449575542752...| 34|
| 1| 3| [1.0,3.0]|[0.31622776601683...| 10|
| 1| 5| [1.0,5.0]|[0.19611613513818...| 26|
| 2| 8| [2.0,8.0]|[0.24253562503633...| 68|
| 8| 7| [8.0,7.0]|[0.75257669470687...| 113|
+---+---+----------+--------------------+----------------+
那我要做向量运算:col['Norm_AB']/col['Inner_Product_AB']
是否有任何内置函数可以执行此操作?
How to calculate the inner product of Vector_AB? (2 norm)
一种方法是使用内置函数 dot
定义一个在 pyspark.ml.linalg.DenseVector
对象上运行的 UDF,即内积:
dot_prod_udf = F.udf(lambda v: int(v.dot(v)), LongType())
示例:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import VectorUDT
from pyspark.sql.types import FloatType
data = [
{"A": 10, "B": 8},
{"A": 3, "B": 5},
{"A": 1, "B": 3},
{"A": 1, "B": 5},
{"A": 2, "B": 8},
{"A": 8, "B": 7},
]
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(data)
Vector = VectorAssembler(inputCols=["A", "B"], outputCol="Vector_AB").transform(df)
dot_prod_udf = F.udf(lambda v: float(v.dot(v)), FloatType())
norm_udf = F.udf(lambda x, y: x / y, VectorUDT())
Vector = Vector.withColumn("Inner_Product_AB", dot_prod_udf("Vector_AB"))
Vector = Vector.withColumn("Inner_Product_AB_sqrt", F.sqrt("Inner_Product_AB"))
Vector = Vector.withColumn("Norm_AB", norm_udf("Vector_AB", "Inner_Product_AB_sqrt"))
结果:
+---+---+----------+----------------+---------------------+----------------------------------------+
|A |B |Vector_AB |Inner_Product_AB|Inner_Product_AB_sqrt|Norm_AB |
+---+---+----------+----------------+---------------------+----------------------------------------+
|10 |8 |[10.0,8.0]|164.0 |12.806248474865697 |[0.7808688094430304,0.6246950475544243] |
|3 |5 |[3.0,5.0] |34.0 |5.830951894845301 |[0.5144957554275265,0.8574929257125441] |
|1 |3 |[1.0,3.0] |10.0 |3.1622776601683795 |[0.31622776601683794,0.9486832980505138]|
|1 |5 |[1.0,5.0] |26.0 |5.0990195135927845 |[0.19611613513818404,0.9805806756909202]|
|2 |8 |[2.0,8.0] |68.0 |8.246211251235321 |[0.24253562503633297,0.9701425001453319]|
|8 |7 |[8.0,7.0] |113.0 |10.63014581273465 |[0.7525766947068778,0.658504607868518] |
+---+---+----------+----------------+---------------------+----------------------------------------+