Spark Streaming 与 mapGroupsWithState
Spark Streaming with mapGroupsWithState
我正在编写一个有状态流应用程序,我在其中使用 mapGroupsWithState 为组创建聚合,但我需要根据输入行中的多个列创建组。 'Spark: The Definitive Guide' 中的所有示例仅使用一列,例如 'User' 或 'Device'。我正在使用类似于下面给出的代码。 如何在 'groupByKey' 中指定多个字段?
还有其他挑战。书上说我们可以使用下面给出的方式 'updateAcrossEvents' 但我得到编译时错误说: Error:(43, 65) missing argument list for method updateAcrossEvents in object Main
未应用的方法仅在需要函数类型时才转换为函数。
您可以通过编写 updateAcrossEvents _
或 updateAcrossEvents(_,_,_,_,_)
而不是 updateAcrossEvents
来明确此转换。
.mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)
另一个挑战:编译器还抱怨我的 MyReport:错误:(41、12) 无法找到存储在数据集中的类型的编码器。通过导入支持原始类型(Int、String 等)和产品类型(case 类)spark.implicits._ 将在未来版本中添加对序列化其他类型的支持。
如果能帮助解决这些错误,我们将不胜感激。提前致谢。
withEventTime
.as[MyReport]
.groupByKey(_.getKeys.getKey1). // How do I add _.getKeys.getKey2?
.mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)
.writeStream
.queryName("test_query")
.format("memory")
.outputMode("update")
.start()
更新跨事件:
def updateAcrossEvents (tuple3: Tuple3[String, String, String], inputs: Iterator[MyReport], oldState: GroupState[MyState]): MyState = {
var state: MyState = if (oldState.exists) oldState.get else MyState.getNewState(tuple3._1, tuple3._2, tuple3._3)
for (input <- inputs) {
state = updateWithEvent(state, input)
oldState.update(state)
}
state
}
updateWithEvent:
def updateWithEvent(state: MyState, report: MyReport): MyState = {
state.someField1 = state.someField1 ++ Array(report.getSomeField1.longValue())
state.someField2 = state.someField2 ++ Array(report.getSomeField2.longValue())
state
}
您可以形成一个键元组 - 检查此代码:
withEventTime
.as[MyReport]
.groupByKey(row => (row.getKeys.getKey1,row.getKeys.getKey2))
.mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)
.writeStream
.queryName("test_query")
.format("memory")
.outputMode("update")
.start()
现在您获得了一个唯一的组(getKey1,getKey2)组合。您可能必须相应地更改您的更新功能。
第二个问题:
是的,spark 默认只支持 case class 和原始类型。
要消除此错误,请确保 "MyReport" 是一个案例 class 并在上述代码之前使用以下代码导入隐式:
import <your_spark_session_variable>.implicits._
我正在编写一个有状态流应用程序,我在其中使用 mapGroupsWithState 为组创建聚合,但我需要根据输入行中的多个列创建组。 'Spark: The Definitive Guide' 中的所有示例仅使用一列,例如 'User' 或 'Device'。我正在使用类似于下面给出的代码。 如何在 'groupByKey' 中指定多个字段?
还有其他挑战。书上说我们可以使用下面给出的方式 'updateAcrossEvents' 但我得到编译时错误说: Error:(43, 65) missing argument list for method updateAcrossEvents in object Main
未应用的方法仅在需要函数类型时才转换为函数。
您可以通过编写 updateAcrossEvents _
或 updateAcrossEvents(_,_,_,_,_)
而不是 updateAcrossEvents
来明确此转换。
.mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)
另一个挑战:编译器还抱怨我的 MyReport:错误:(41、12) 无法找到存储在数据集中的类型的编码器。通过导入支持原始类型(Int、String 等)和产品类型(case 类)spark.implicits._ 将在未来版本中添加对序列化其他类型的支持。
如果能帮助解决这些错误,我们将不胜感激。提前致谢。
withEventTime
.as[MyReport]
.groupByKey(_.getKeys.getKey1). // How do I add _.getKeys.getKey2?
.mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)
.writeStream
.queryName("test_query")
.format("memory")
.outputMode("update")
.start()
更新跨事件:
def updateAcrossEvents (tuple3: Tuple3[String, String, String], inputs: Iterator[MyReport], oldState: GroupState[MyState]): MyState = {
var state: MyState = if (oldState.exists) oldState.get else MyState.getNewState(tuple3._1, tuple3._2, tuple3._3)
for (input <- inputs) {
state = updateWithEvent(state, input)
oldState.update(state)
}
state
}
updateWithEvent:
def updateWithEvent(state: MyState, report: MyReport): MyState = {
state.someField1 = state.someField1 ++ Array(report.getSomeField1.longValue())
state.someField2 = state.someField2 ++ Array(report.getSomeField2.longValue())
state
}
您可以形成一个键元组 - 检查此代码:
withEventTime
.as[MyReport]
.groupByKey(row => (row.getKeys.getKey1,row.getKeys.getKey2))
.mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)
.writeStream
.queryName("test_query")
.format("memory")
.outputMode("update")
.start()
现在您获得了一个唯一的组(getKey1,getKey2)组合。您可能必须相应地更改您的更新功能。
第二个问题:
是的,spark 默认只支持 case class 和原始类型。
要消除此错误,请确保 "MyReport" 是一个案例 class 并在上述代码之前使用以下代码导入隐式:
import <your_spark_session_variable>.implicits._