spark-streaming scala:如何将字符串数组传递给过滤器?
spark-streaming scala: how can I pass an array of strings to a filter?
我想将字符串 "a" 替换为字符串数组,使 .contains() 检查数组中的每个字符串。这可能吗?
val filtered = stream.flatMap(status => status.getText.split(" ").filter(_.contains("a")))
编辑:
也试过这个(sc 是 sparkContext):
val ssc = new StreamingContext(sc, Seconds(15))
val stream = TwitterUtils.createStream(ssc, None)
val filtered = stream.flatMap(status => status.getText.split(" ").filter(a.contains(_)))
并出现以下错误:
java.io.NotSerializableException:org.apache.spark.streaming.twitter.TwitterInputDStream 的对象可能作为 RDD 操作关闭的一部分被序列化。这是因为 DStream 对象是从闭包中引用的。请重写此 DStream 内部的 RDD 操作以避免这种情况。强制执行此操作是为了避免 Spark 任务因不必要的对象而膨胀。
然后我尝试在使用之前广播数组:
val aBroadcast = sc.broadcast(a)
val filtered = stream.flatMap(status => status.getText.split(" ").filter(aBroadcast.value.contains(_)))
同样的错误。
谢谢
据我了解,您想查看拆分后的状态文本是否包含单词列表,该单词列表是 a
:
的子集
val a = Array("a1", "a2")
val filtered = stream.flatMap(status => status.getText.split(" ").filter(_.forall(a contains))
我想将字符串 "a" 替换为字符串数组,使 .contains() 检查数组中的每个字符串。这可能吗?
val filtered = stream.flatMap(status => status.getText.split(" ").filter(_.contains("a")))
编辑:
也试过这个(sc 是 sparkContext):
val ssc = new StreamingContext(sc, Seconds(15))
val stream = TwitterUtils.createStream(ssc, None)
val filtered = stream.flatMap(status => status.getText.split(" ").filter(a.contains(_)))
并出现以下错误:
java.io.NotSerializableException:org.apache.spark.streaming.twitter.TwitterInputDStream 的对象可能作为 RDD 操作关闭的一部分被序列化。这是因为 DStream 对象是从闭包中引用的。请重写此 DStream 内部的 RDD 操作以避免这种情况。强制执行此操作是为了避免 Spark 任务因不必要的对象而膨胀。
然后我尝试在使用之前广播数组:
val aBroadcast = sc.broadcast(a)
val filtered = stream.flatMap(status => status.getText.split(" ").filter(aBroadcast.value.contains(_)))
同样的错误。
谢谢
据我了解,您想查看拆分后的状态文本是否包含单词列表,该单词列表是 a
:
val a = Array("a1", "a2")
val filtered = stream.flatMap(status => status.getText.split(" ").filter(_.forall(a contains))