如何合并两个不同类型的流 RDD
How to merge two different type streams RDDs
我需要合并两个不同的流 RDD。
stream类型的Uno是org.apache.spark.streaming.dstream.DStream[String],另外一个是org.apache.spark.streaming.dstream.DStream[twitter4j.Status].
类型
我试过:
val streamRDD = stream.union(sentiments)
但是不会成功:
[error] found : org.apache.spark.streaming.dstream.DStream[String]
[error] required: org.apache.spark.streaming.dstream.DStream[twitter4j.Status]
[error] val streamRDD = stream.union(sentiments)
[error] ^
问题是 union
只适用于相同元素类型的两个 DStream
,而你有 DStream[String]
和 DStream[twitter4j.Status]
和 String
是不是 twitter4j.Status
.
我假设你有以下类型:
val stream: DStream[twitter4j.Status]
val sentiments: DStream[String]
你有不同的选择来解决这个问题:
您确定 String
和 twitter4j.Status
应该混合为一个 DStream
因为它们在您的上下文中代表相同的信息:将 DStream
来匹配另一个
a) 转换stream
以匹配sentiments
,所以你需要一个转换twitter4j.Status => String
,可能你可以像这样使用_.toString
:
val stream2 = stream.map(_.toString)
val result = stream2.union(sentiments)
- b) 转换
sentiments
以匹配 stream
,需要 String => twitter4j.Status
.
String
和 twitter4j.Status
在您的上下文中是两个不同的东西,您想保持两者之间的区别,但仍将它们合并为一个 DStream
一般情况下你可以用一个Sum
类型来表示每种情况,这里我们只有两个,所以我们可以使用预定义的Either
:
type R = DStream[Either[String,twitter4j.Status] // shorter
val streamL: R = stream.map(Left(_))
val sentimentR: R = sentiments.map(Right(_))
val result: R = streamL.union(sentimentsR)
最后你会得到 一个 流,其中每个元素都是 String
包裹在 Left
或 twitter4j.Status
中包裹在Right
中,让您在处理流时区分两者。
我需要合并两个不同的流 RDD。
stream类型的Uno是org.apache.spark.streaming.dstream.DStream[String],另外一个是org.apache.spark.streaming.dstream.DStream[twitter4j.Status].
类型我试过:
val streamRDD = stream.union(sentiments)
但是不会成功:
[error] found : org.apache.spark.streaming.dstream.DStream[String]
[error] required: org.apache.spark.streaming.dstream.DStream[twitter4j.Status]
[error] val streamRDD = stream.union(sentiments)
[error] ^
问题是 union
只适用于相同元素类型的两个 DStream
,而你有 DStream[String]
和 DStream[twitter4j.Status]
和 String
是不是 twitter4j.Status
.
我假设你有以下类型:
val stream: DStream[twitter4j.Status]
val sentiments: DStream[String]
你有不同的选择来解决这个问题:
您确定
String
和twitter4j.Status
应该混合为一个DStream
因为它们在您的上下文中代表相同的信息:将DStream
来匹配另一个a) 转换
stream
以匹配sentiments
,所以你需要一个转换twitter4j.Status => String
,可能你可以像这样使用_.toString
:val stream2 = stream.map(_.toString) val result = stream2.union(sentiments)
- b) 转换
sentiments
以匹配stream
,需要String => twitter4j.Status
.
String
和twitter4j.Status
在您的上下文中是两个不同的东西,您想保持两者之间的区别,但仍将它们合并为一个DStream
一般情况下你可以用一个
Sum
类型来表示每种情况,这里我们只有两个,所以我们可以使用预定义的Either
:type R = DStream[Either[String,twitter4j.Status] // shorter val streamL: R = stream.map(Left(_)) val sentimentR: R = sentiments.map(Right(_)) val result: R = streamL.union(sentimentsR)
最后你会得到 一个 流,其中每个元素都是
String
包裹在Left
或twitter4j.Status
中包裹在Right
中,让您在处理流时区分两者。