pyspark.ml 管道:基本预处理任务是否需要自定义转换器?
pyspark.ml pipelines: are custom transformers necessary for basic preprocessing tasks?
开始使用 pyspark.ml
和管道 API,我发现自己正在为典型的预处理任务编写自定义转换器,以便在管道中使用它们。示例:
from pyspark.ml import Pipeline, Transformer
class CustomTransformer(Transformer):
# lazy workaround - a transformer needs to have these attributes
_defaultParamMap = dict()
_paramMap = dict()
_params = dict()
class ColumnSelector(CustomTransformer):
"""Transformer that selects a subset of columns
- to be used as pipeline stage"""
def __init__(self, columns):
self.columns = columns
def _transform(self, data):
return data.select(self.columns)
class ColumnRenamer(CustomTransformer):
"""Transformer renames one column"""
def __init__(self, rename):
self.rename = rename
def _transform(self, data):
(colNameBefore, colNameAfter) = self.rename
return data.withColumnRenamed(colNameBefore, colNameAfter)
class NaDropper(CustomTransformer):
"""
Drops rows with at least one not-a-number element
"""
def __init__(self, cols=None):
self.cols = cols
def _transform(self, data):
dataAfterDrop = data.dropna(subset=self.cols)
return dataAfterDrop
class ColumnCaster(CustomTransformer):
def __init__(self, col, toType):
self.col = col
self.toType = toType
def _transform(self, data):
return data.withColumn(self.col, data[self.col].cast(self.toType))
它们有效,但我想知道这是一种模式还是反模式 - 这样的转换器是处理管道 API 的好方法吗?是否有必要实施它们,或者是否在其他地方提供了等效的功能?
我会说它主要是基于意见的,虽然它看起来不必要地冗长并且 Python Transformers
不能很好地与 Pipeline
[=24= 的其余部分集成].
同样值得指出的是,您在这里拥有的一切都可以通过 SQLTransformer
轻松实现。例如:
from pyspark.ml.feature import SQLTransformer
def column_selector(columns):
return SQLTransformer(
statement="SELECT {} FROM __THIS__".format(", ".join(columns))
)
或
def na_dropper(columns):
return SQLTransformer(
statement="SELECT * FROM __THIS__ WHERE {}".format(
" AND ".join(["{} IS NOT NULL".format(x) for x in columns])
)
)
稍加努力,您可以使用 SQLAlchemy 和 Hive 方言来避免手写 SQL。
开始使用 pyspark.ml
和管道 API,我发现自己正在为典型的预处理任务编写自定义转换器,以便在管道中使用它们。示例:
from pyspark.ml import Pipeline, Transformer
class CustomTransformer(Transformer):
# lazy workaround - a transformer needs to have these attributes
_defaultParamMap = dict()
_paramMap = dict()
_params = dict()
class ColumnSelector(CustomTransformer):
"""Transformer that selects a subset of columns
- to be used as pipeline stage"""
def __init__(self, columns):
self.columns = columns
def _transform(self, data):
return data.select(self.columns)
class ColumnRenamer(CustomTransformer):
"""Transformer renames one column"""
def __init__(self, rename):
self.rename = rename
def _transform(self, data):
(colNameBefore, colNameAfter) = self.rename
return data.withColumnRenamed(colNameBefore, colNameAfter)
class NaDropper(CustomTransformer):
"""
Drops rows with at least one not-a-number element
"""
def __init__(self, cols=None):
self.cols = cols
def _transform(self, data):
dataAfterDrop = data.dropna(subset=self.cols)
return dataAfterDrop
class ColumnCaster(CustomTransformer):
def __init__(self, col, toType):
self.col = col
self.toType = toType
def _transform(self, data):
return data.withColumn(self.col, data[self.col].cast(self.toType))
它们有效,但我想知道这是一种模式还是反模式 - 这样的转换器是处理管道 API 的好方法吗?是否有必要实施它们,或者是否在其他地方提供了等效的功能?
我会说它主要是基于意见的,虽然它看起来不必要地冗长并且 Python Transformers
不能很好地与 Pipeline
[=24= 的其余部分集成].
同样值得指出的是,您在这里拥有的一切都可以通过 SQLTransformer
轻松实现。例如:
from pyspark.ml.feature import SQLTransformer
def column_selector(columns):
return SQLTransformer(
statement="SELECT {} FROM __THIS__".format(", ".join(columns))
)
或
def na_dropper(columns):
return SQLTransformer(
statement="SELECT * FROM __THIS__ WHERE {}".format(
" AND ".join(["{} IS NOT NULL".format(x) for x in columns])
)
)
稍加努力,您可以使用 SQLAlchemy 和 Hive 方言来避免手写 SQL。