如何合并两个不同类型的流 RDD

How to merge two different type streams RDDs

我需要合并两个不同的流 RDD。

stream类型的Uno是org.apache.spark.streaming.dstream.DStream[String],另外一个是org.apache.spark.streaming.dstream.DStream[twitter4j.Status].

类型

我试过:

  val streamRDD = stream.union(sentiments)

但是不会成功:

[error]  found   : org.apache.spark.streaming.dstream.DStream[String]
[error]  required: org.apache.spark.streaming.dstream.DStream[twitter4j.Status]
[error]       val streamRDD = stream.union(sentiments)
[error]                                    ^

问题是 union 只适用于相同元素类型的两个 DStream,而你有 DStream[String]DStream[twitter4j.Status]String 是不是 twitter4j.Status.

我假设你有以下类型:

val stream: DStream[twitter4j.Status]
val sentiments: DStream[String]

你有不同的选择来解决这个问题:

    1. 您确定 Stringtwitter4j.Status 应该混合为一个 DStream 因为它们在您的上下文中代表相同的信息:将 DStream 来匹配另一个

      • a) 转换stream以匹配sentiments,所以你需要一个转换twitter4j.Status => String,可能你可以像这样使用_.toString:

        val stream2 = stream.map(_.toString)
        val result = stream2.union(sentiments)
        
      • b) 转换 sentiments 以匹配 stream,需要 String => twitter4j.Status.
    1. Stringtwitter4j.Status 在您的上下文中是两个不同的东西,您想保持两者之间的区别,但仍将它们合并为一个 DStream

    一般情况下你可以用一个Sum类型来表示每种情况,这里我们只有两个,所以我们可以使用预定义的Either:

    type R = DStream[Either[String,twitter4j.Status] // shorter
    val streamL: R = stream.map(Left(_))
    val sentimentR: R = sentiments.map(Right(_))
    val result: R = streamL.union(sentimentsR)
    

    最后你会得到 一个 流,其中每个元素都是 String 包裹在 Lefttwitter4j.Status 中包裹在Right中,让您在处理流时区分两者。