如何使用 StandardScaler 标准化 Spark 中的一列?
How to standardize ONE column in Spark using StandardScaler?
我正在尝试标准化(均值 = 0,标准差 = 1)数据框中的一列 ('age')。下面是我在 Spark 中的代码 (Python):
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
# Make my 'age' column an assembler type:
age_assembler = VectorAssembler(inputCols= ['age'], outputCol = "age_feature")
# Create a scaler that takes 'age_feature' as an input column:
scaler = StandardScaler(inputCol="age_feature", outputCol="age_scaled",
withStd=True, withMean=True)
# Creating a mini-pipeline for those 2 steps:
age_pipeline = Pipeline(stages=[age_assembler, scaler])
scaled = age_pipeline.fit(sample17)
sample17_scaled = scaled.transform(sample17)
type(sample17_scaled)
好像运行就好了。最后一行产生:"sample17_scaled:pyspark.sql.dataframe.DataFrame"
但是当我 运行 下面的行显示新列 age_scaled 的类型是 'vector': |-- age_scaled: vector (nullable = true )
sample17_scaled.printSchema()
如何使用这个新列计算任何值?例如,我无法计算平均值。当我尝试时,它说它应该是 'long' 而不是 udt.
非常感谢!
只需使用普通聚合:
from pyspark.sql.functions import stddev, mean, col
sample17 = spark.createDataFrame([(1, ), (2, ), (3, )]).toDF("age")
(sample17
.select(mean("age").alias("mean_age"), stddev("age").alias("stddev_age"))
.crossJoin(sample17)
.withColumn("age_scaled" , (col("age") - col("mean_age")) / col("stddev_age")))
# +--------+----------+---+----------+
# |mean_age|stddev_age|age|age_scaled|
# +--------+----------+---+----------+
# | 2.0| 1.0| 1| -1.0|
# | 2.0| 1.0| 2| 0.0|
# | 2.0| 1.0| 3| 1.0|
# +--------+----------+---+----------+
或
mean_age, sttdev_age = sample17.select(mean("age"), stddev("age")).first()
sample17.withColumn("age_scaled", (col("age") - mean_age) / sttdev_age)
# +---+----------+
# |age|age_scaled|
# +---+----------+
# | 1| -1.0|
# | 2| 0.0|
# | 3| 1.0|
# +---+----------+
如果你想要Transformer
你可以。
我正在尝试标准化(均值 = 0,标准差 = 1)数据框中的一列 ('age')。下面是我在 Spark 中的代码 (Python):
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
# Make my 'age' column an assembler type:
age_assembler = VectorAssembler(inputCols= ['age'], outputCol = "age_feature")
# Create a scaler that takes 'age_feature' as an input column:
scaler = StandardScaler(inputCol="age_feature", outputCol="age_scaled",
withStd=True, withMean=True)
# Creating a mini-pipeline for those 2 steps:
age_pipeline = Pipeline(stages=[age_assembler, scaler])
scaled = age_pipeline.fit(sample17)
sample17_scaled = scaled.transform(sample17)
type(sample17_scaled)
好像运行就好了。最后一行产生:"sample17_scaled:pyspark.sql.dataframe.DataFrame"
但是当我 运行 下面的行显示新列 age_scaled 的类型是 'vector': |-- age_scaled: vector (nullable = true )
sample17_scaled.printSchema()
如何使用这个新列计算任何值?例如,我无法计算平均值。当我尝试时,它说它应该是 'long' 而不是 udt.
非常感谢!
只需使用普通聚合:
from pyspark.sql.functions import stddev, mean, col
sample17 = spark.createDataFrame([(1, ), (2, ), (3, )]).toDF("age")
(sample17
.select(mean("age").alias("mean_age"), stddev("age").alias("stddev_age"))
.crossJoin(sample17)
.withColumn("age_scaled" , (col("age") - col("mean_age")) / col("stddev_age")))
# +--------+----------+---+----------+
# |mean_age|stddev_age|age|age_scaled|
# +--------+----------+---+----------+
# | 2.0| 1.0| 1| -1.0|
# | 2.0| 1.0| 2| 0.0|
# | 2.0| 1.0| 3| 1.0|
# +--------+----------+---+----------+
或
mean_age, sttdev_age = sample17.select(mean("age"), stddev("age")).first()
sample17.withColumn("age_scaled", (col("age") - mean_age) / sttdev_age)
# +---+----------+
# |age|age_scaled|
# +---+----------+
# | 1| -1.0|
# | 2| 0.0|
# | 3| 1.0|
# +---+----------+
如果你想要Transformer
你可以