在 Flink 中调试自定义管道转换器
Debug a custom Pipeline Transformer in Flink
我正在尝试按照 its documentation 中的指示在 Flink 中实现自定义 Transformer,但是当我尝试执行时,似乎从未调用 fit
操作。这是我到目前为止所做的:
class InfoGainTransformer extends Transformer[InfoGainTransformer] {
import InfoGainTransformer._
private[this] var counts: Option[collection.immutable.Vector[Map[Key, Double]]] = None
// here setters for params, as Flink does
}
object InfoGainTransformer {
// ====================================== Parameters =============================================
// ...
// ==================================== Factory methods ==========================================
// ...
// ========================================== Operations =========================================
implicit def fitLabeledVectorInfoGain = new FitOperation[InfoGainTransformer, LabeledVector] {
override def fit(instance: InfoGainTransformer, fitParameters: ParameterMap, input: DataSet[LabeledVector]): Unit = {
val counts = collection.immutable.Vector[Map[Key, Double]]()
input.map {
v =>
v.vector.map {
case (i, value) =>
println("INSIDE!!!")
val key = Key(value, v.label)
val cval = counts(i).getOrElse(key, .0)
counts(i) + (key -> cval)
}
}
}
}
implicit def fitVectorInfoGain[T <: Vector] = new FitOperation[InfoGainTransformer, T] {
override def fit(instance: InfoGainTransformer, fitParameters: ParameterMap, input: DataSet[T]): Unit = {
input
}
}
implicit def transformLabeledVectorsInfoGain = {
new TransformDataSetOperation[InfoGainTransformer, LabeledVector, LabeledVector] {
override def transformDataSet(
instance: InfoGainTransformer,
transformParameters: ParameterMap,
input: DataSet[LabeledVector]): DataSet[LabeledVector] = input
}
}
implicit def transformVectorsInfoGain[T <: Vector : BreezeVectorConverter : TypeInformation : ClassTag] = {
new TransformDataSetOperation[InfoGainTransformer, T, T] {
override def transformDataSet(instance: InfoGainTransformer, transformParameters: ParameterMap, input: DataSet[T]): DataSet[T] = input
}
}
}
然后我尝试了两种使用方式:
val scaler = StandardScaler()
val polyFeatures = PolynomialFeatures()
val mlr = MultipleLinearRegression()
val gain = InfoGainTransformer().setK(2)
// Construct the pipeline
val pipeline = scaler
.chainTransformer(polyFeatures)
.chainTransformer(gain)
.chainPredictor(mlr)
val r = pipeline.predict(dataSet map (_.vector))
r.print()
只有我的变压器:
pipeline.fit(dataSet)
在这两种情况下,当我在 fitLabeledVectorInfoGain
内设置断点时,例如在行 input.map
中,调试器会停在那里,但如果我还在嵌套映射内设置断点,则下面的例子 println("INSIDE!!!")
,它永远不会停在那里。
有谁知道我该如何调试这个自定义转换器?
现在看来可以正常使用了。我认为发生的事情是我没有正确实施 FitOperation
因为实例状态中没有保存任何内容,现在是实施:
implicit def fitLabeledVectorInfoGain = new FitOperation[InfoGainTransformer, LabeledVector] {
override def fit(instance: InfoGainTransformer, fitParameters: ParameterMap, input: DataSet[LabeledVector]): Unit = {
// val counts = collection.immutable.Vector[Map[Key, Double]]()
val r = input.map {
v =>
v.vector.foldLeft(Map.empty[Key, Double]) {
case (m, (i, value)) =>
println("INSIDE fit!!!")
val key = Key(value, v.label)
val cval = m.getOrElse(key, .0) + 1.0
m + (key -> cval)
}
}
instance.counts = Some(r)
}
}
现在调试器正确进入所有断点,TransformOperation
它也被调用。
我正在尝试按照 its documentation 中的指示在 Flink 中实现自定义 Transformer,但是当我尝试执行时,似乎从未调用 fit
操作。这是我到目前为止所做的:
class InfoGainTransformer extends Transformer[InfoGainTransformer] {
import InfoGainTransformer._
private[this] var counts: Option[collection.immutable.Vector[Map[Key, Double]]] = None
// here setters for params, as Flink does
}
object InfoGainTransformer {
// ====================================== Parameters =============================================
// ...
// ==================================== Factory methods ==========================================
// ...
// ========================================== Operations =========================================
implicit def fitLabeledVectorInfoGain = new FitOperation[InfoGainTransformer, LabeledVector] {
override def fit(instance: InfoGainTransformer, fitParameters: ParameterMap, input: DataSet[LabeledVector]): Unit = {
val counts = collection.immutable.Vector[Map[Key, Double]]()
input.map {
v =>
v.vector.map {
case (i, value) =>
println("INSIDE!!!")
val key = Key(value, v.label)
val cval = counts(i).getOrElse(key, .0)
counts(i) + (key -> cval)
}
}
}
}
implicit def fitVectorInfoGain[T <: Vector] = new FitOperation[InfoGainTransformer, T] {
override def fit(instance: InfoGainTransformer, fitParameters: ParameterMap, input: DataSet[T]): Unit = {
input
}
}
implicit def transformLabeledVectorsInfoGain = {
new TransformDataSetOperation[InfoGainTransformer, LabeledVector, LabeledVector] {
override def transformDataSet(
instance: InfoGainTransformer,
transformParameters: ParameterMap,
input: DataSet[LabeledVector]): DataSet[LabeledVector] = input
}
}
implicit def transformVectorsInfoGain[T <: Vector : BreezeVectorConverter : TypeInformation : ClassTag] = {
new TransformDataSetOperation[InfoGainTransformer, T, T] {
override def transformDataSet(instance: InfoGainTransformer, transformParameters: ParameterMap, input: DataSet[T]): DataSet[T] = input
}
}
}
然后我尝试了两种使用方式:
val scaler = StandardScaler()
val polyFeatures = PolynomialFeatures()
val mlr = MultipleLinearRegression()
val gain = InfoGainTransformer().setK(2)
// Construct the pipeline
val pipeline = scaler
.chainTransformer(polyFeatures)
.chainTransformer(gain)
.chainPredictor(mlr)
val r = pipeline.predict(dataSet map (_.vector))
r.print()
只有我的变压器:
pipeline.fit(dataSet)
在这两种情况下,当我在 fitLabeledVectorInfoGain
内设置断点时,例如在行 input.map
中,调试器会停在那里,但如果我还在嵌套映射内设置断点,则下面的例子 println("INSIDE!!!")
,它永远不会停在那里。
有谁知道我该如何调试这个自定义转换器?
现在看来可以正常使用了。我认为发生的事情是我没有正确实施 FitOperation
因为实例状态中没有保存任何内容,现在是实施:
implicit def fitLabeledVectorInfoGain = new FitOperation[InfoGainTransformer, LabeledVector] {
override def fit(instance: InfoGainTransformer, fitParameters: ParameterMap, input: DataSet[LabeledVector]): Unit = {
// val counts = collection.immutable.Vector[Map[Key, Double]]()
val r = input.map {
v =>
v.vector.foldLeft(Map.empty[Key, Double]) {
case (m, (i, value)) =>
println("INSIDE fit!!!")
val key = Key(value, v.label)
val cval = m.getOrElse(key, .0) + 1.0
m + (key -> cval)
}
}
instance.counts = Some(r)
}
}
现在调试器正确进入所有断点,TransformOperation
它也被调用。