Spark Streaming 与 mapGroupsWithState

Spark Streaming with mapGroupsWithState

我正在编写一个有状态流应用程序,我在其中使用 mapGroupsWithState 为组创建聚合,但我需要根据输入行中的多个列创建组。 'Spark: The Definitive Guide' 中的所有示例仅使用一列,例如 'User' 或 'Device'。我正在使用类似于下面给出的代码。 如何在 'groupByKey' 中指定多个字段?

还有其他挑战。书上说我们可以使用下面给出的方式 'updateAcrossEvents' 但我得到编译时错误说: Error:(43, 65) missing argument list for method updateAcrossEvents in object Main 未应用的方法仅在需要函数类型时才转换为函数。 您可以通过编写 updateAcrossEvents _updateAcrossEvents(_,_,_,_,_) 而不是 updateAcrossEvents 来明确此转换。 .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)

另一个挑战:编译器还抱怨我的 MyReport:错误:(41、12) 无法找到存储在数据集中的类型的编码器。通过导入支持原始类型(Int、String 等)和产品类型(case 类)spark.implicits._ 将在未来版本中添加对序列化其他类型的支持。

如果能帮助解决这些错误,我们将不胜感激。提前致谢。

withEventTime
 .as[MyReport]
 .groupByKey(_.getKeys.getKey1). // How do I add _.getKeys.getKey2?
 .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)
 .writeStream
 .queryName("test_query")
 .format("memory")
 .outputMode("update")
 .start()

更新跨事件:

def updateAcrossEvents (tuple3: Tuple3[String, String, String], inputs: Iterator[MyReport], oldState: GroupState[MyState]): MyState = {

 var state: MyState = if (oldState.exists) oldState.get else MyState.getNewState(tuple3._1, tuple3._2, tuple3._3)

 for (input <- inputs) {
 state = updateWithEvent(state, input)
 oldState.update(state)
 }

 state
}

updateWithEvent:

def updateWithEvent(state: MyState, report: MyReport): MyState = {

 state.someField1 = state.someField1 ++ Array(report.getSomeField1.longValue())
 state.someField2 = state.someField2 ++ Array(report.getSomeField2.longValue())

 state
}

您可以形成一个键元组 - 检查此代码:

withEventTime
 .as[MyReport]
 .groupByKey(row => (row.getKeys.getKey1,row.getKeys.getKey2)) 
 .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)
 .writeStream
 .queryName("test_query")
 .format("memory")
 .outputMode("update")
 .start()

现在您获得了一个唯一的组(getKey1,getKey2)组合。您可能必须相应地更改您的更新功能。

第二个问题:

是的,spark 默认只支持 case class 和原始类型。

要消除此错误,请确保 "MyReport" 是一个案例 class 并在上述代码之前使用以下代码导入隐式:

import <your_spark_session_variable>.implicits._