Spark Streaming如何用mapWithState立即操作数据
Spark streaming how to immediately operate data with mapWithState
我要接受电视数据并做一些操作
首先我创建一个 Class 作为数据类型
class Customer(val customerID: Int, val TimeStamp: Int, val Channel: Int)
streaming接收数据时,我转为RDD[k,v]类型,k为customerID+TimeStamp,然后vlue为Customer对象
第一个问题是如何使用mapWithState收集所有数据?
(这样我就可以做一些统计之类的操作了)
第二个是如果我有一个函数
rateLookup(Channel, TimeStamp)
列出具有相同Channel和TimeStamp的所有数据
如何让用户在流式传输时触发此功能运行?
流能检测用户输入然后执行函数吗?
The first question is how to use mapWithState to collect all data?
您需要使用 StateSpec.function
为 mapWithState
创建匹配函数:
def analyzeCustomer(customerId: Int,
data: Option[Seq[Customer]],
state: State[Customer]): Option[Customer] = {
// Do stuff
}
其中 State[T]
是您将保存在内存中的数据的状态,数据是您将从 DStream
接收的数据。现在这样称呼它:
val spec = StateSpec.function(analyzeCustomer _)
rdd.mapWithState(spec)
how to let user trigger this function while the streaming is running?
有很多方法可以做到这一点。一种是将状态输出到持久性数据存储中,然后让用户界面从持久性存储中进行操作。这可以通过 foreachRDD
:
来完成
rdd.mapWithState(spec)
.foreachRDD(rdd => {
// Output state to persistent storage.
})
我要接受电视数据并做一些操作
首先我创建一个 Class 作为数据类型
class Customer(val customerID: Int, val TimeStamp: Int, val Channel: Int)
streaming接收数据时,我转为RDD[k,v]类型,k为customerID+TimeStamp,然后vlue为Customer对象
第一个问题是如何使用mapWithState收集所有数据? (这样我就可以做一些统计之类的操作了)
第二个是如果我有一个函数
rateLookup(Channel, TimeStamp)
列出具有相同Channel和TimeStamp的所有数据
如何让用户在流式传输时触发此功能运行?
流能检测用户输入然后执行函数吗?
The first question is how to use mapWithState to collect all data?
您需要使用 StateSpec.function
为 mapWithState
创建匹配函数:
def analyzeCustomer(customerId: Int,
data: Option[Seq[Customer]],
state: State[Customer]): Option[Customer] = {
// Do stuff
}
其中 State[T]
是您将保存在内存中的数据的状态,数据是您将从 DStream
接收的数据。现在这样称呼它:
val spec = StateSpec.function(analyzeCustomer _)
rdd.mapWithState(spec)
how to let user trigger this function while the streaming is running?
有很多方法可以做到这一点。一种是将状态输出到持久性数据存储中,然后让用户界面从持久性存储中进行操作。这可以通过 foreachRDD
:
rdd.mapWithState(spec)
.foreachRDD(rdd => {
// Output state to persistent storage.
})