将两个 Spark mllib 管道连接在一起
Join two Spark mllib pipelines together
我有两个单独的 DataFrames
,每个都有几个不同的处理阶段,我在管道中使用 mllib
转换器来处理。
我现在想将这两个管道连接在一起,保留每个 DataFrame
的特征(列)。
Scikit-learn 有 FeatureUnion
class 来处理这个问题,我似乎找不到 mllib
.
的等价物
我可以在一个管道的末尾添加一个自定义转换器阶段,将另一个管道生成的 DataFrame 作为属性并将其加入转换方法中,但这看起来很乱。
Pipeline
或 PipelineModel
是有效的 PipelineStages
,因此可以组合成一个 Pipeline
。例如:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
df = spark.createDataFrame([
(1.0, 0, 1, 1, 0),
(0.0, 1, 0, 0, 1)
], ("label", "x1", "x2", "x3", "x4"))
pipeline1 = Pipeline(stages=[
VectorAssembler(inputCols=["x1", "x2"], outputCol="features1")
])
pipeline2 = Pipeline(stages=[
VectorAssembler(inputCols=["x3", "x4"], outputCol="features2")
])
你可以组合 Pipelines
:
Pipeline(stages=[
pipeline1, pipeline2,
VectorAssembler(inputCols=["features1", "features2"], outputCol="features")
]).fit(df).transform(df)
+-----+---+---+---+---+---------+---------+-----------------+
|label|x1 |x2 |x3 |x4 |features1|features2|features |
+-----+---+---+---+---+---------+---------+-----------------+
|1.0 |0 |1 |1 |0 |[0.0,1.0]|[1.0,0.0]|[0.0,1.0,1.0,0.0]|
|0.0 |1 |0 |0 |1 |[1.0,0.0]|[0.0,1.0]|[1.0,0.0,0.0,1.0]|
+-----+---+---+---+---+---------+---------+-----------------+
或预装PipelineModels
:
model1 = pipeline1.fit(df)
model2 = pipeline2.fit(df)
Pipeline(stages=[
model1, model2,
VectorAssembler(inputCols=["features1", "features2"], outputCol="features")
]).fit(df).transform(df)
+-----+---+---+---+---+---------+---------+-----------------+
|label| x1| x2| x3| x4|features1|features2| features|
+-----+---+---+---+---+---------+---------+-----------------+
| 1.0| 0| 1| 1| 0|[0.0,1.0]|[1.0,0.0]|[0.0,1.0,1.0,0.0]|
| 0.0| 1| 0| 0| 1|[1.0,0.0]|[0.0,1.0]|[1.0,0.0,0.0,1.0]|
+-----+---+---+---+---+---------+---------+-----------------+
所以我推荐的方法是预先加入数据,fit
和transform
一个整体DataFrame
。
另请参阅:
我有两个单独的 DataFrames
,每个都有几个不同的处理阶段,我在管道中使用 mllib
转换器来处理。
我现在想将这两个管道连接在一起,保留每个 DataFrame
的特征(列)。
Scikit-learn 有 FeatureUnion
class 来处理这个问题,我似乎找不到 mllib
.
我可以在一个管道的末尾添加一个自定义转换器阶段,将另一个管道生成的 DataFrame 作为属性并将其加入转换方法中,但这看起来很乱。
Pipeline
或 PipelineModel
是有效的 PipelineStages
,因此可以组合成一个 Pipeline
。例如:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
df = spark.createDataFrame([
(1.0, 0, 1, 1, 0),
(0.0, 1, 0, 0, 1)
], ("label", "x1", "x2", "x3", "x4"))
pipeline1 = Pipeline(stages=[
VectorAssembler(inputCols=["x1", "x2"], outputCol="features1")
])
pipeline2 = Pipeline(stages=[
VectorAssembler(inputCols=["x3", "x4"], outputCol="features2")
])
你可以组合 Pipelines
:
Pipeline(stages=[
pipeline1, pipeline2,
VectorAssembler(inputCols=["features1", "features2"], outputCol="features")
]).fit(df).transform(df)
+-----+---+---+---+---+---------+---------+-----------------+
|label|x1 |x2 |x3 |x4 |features1|features2|features |
+-----+---+---+---+---+---------+---------+-----------------+
|1.0 |0 |1 |1 |0 |[0.0,1.0]|[1.0,0.0]|[0.0,1.0,1.0,0.0]|
|0.0 |1 |0 |0 |1 |[1.0,0.0]|[0.0,1.0]|[1.0,0.0,0.0,1.0]|
+-----+---+---+---+---+---------+---------+-----------------+
或预装PipelineModels
:
model1 = pipeline1.fit(df)
model2 = pipeline2.fit(df)
Pipeline(stages=[
model1, model2,
VectorAssembler(inputCols=["features1", "features2"], outputCol="features")
]).fit(df).transform(df)
+-----+---+---+---+---+---------+---------+-----------------+
|label| x1| x2| x3| x4|features1|features2| features|
+-----+---+---+---+---+---------+---------+-----------------+
| 1.0| 0| 1| 1| 0|[0.0,1.0]|[1.0,0.0]|[0.0,1.0,1.0,0.0]|
| 0.0| 1| 0| 0| 1|[1.0,0.0]|[0.0,1.0]|[1.0,0.0,0.0,1.0]|
+-----+---+---+---+---+---------+---------+-----------------+
所以我推荐的方法是预先加入数据,fit
和transform
一个整体DataFrame
。
另请参阅: