如果在 Spark Streaming 中

If else in Spark Streaming

我有一个 Spark Streaming 应用程序,它从 Kafka 中的 SINGLE TOPIC 读取数据,对其进行处理,然后根据元素的内容将其插入到 Cassandra 中的 2 个不同的键空间中。一些数据可能进入键空间 A,另一些数据进入键空间 B。

我目前使用的是过滤操作:

Functions.insertToCassandra(rdd.filter(element => element.tenant=="A"), keyspace = A, table = "tableName")
Functions.insertToCassandra(rdd.filter(element => element.tenant=="B"), keyspace = B, table = "tableName")

因此过滤器应用于每个rdd,具有租户字段A的元素进入键空间A,具有租户字段B的元素进入键空间B。

有没有更有效的方法来做到这一点,而不是应用过滤操作 2 次(特别是因为以后可能会有超过 2 个键空间)?在过滤操作之前缓存 rdd 会提高性能吗?

我重复一遍,我有来自 Kafka 的 DStream,我处理它,然后在 "foreachRDD" 操作中我有上面的代码片段,它正在向 Cassandra 插入数据。

谢谢

在你做之前

Functions.insertToCassandra(rdd.filter(element => element.tenant=="A"), keyspace = A, "tableName")
Functions.insertToCassandra(rdd.filter(element=> element.tenant=="B"), keyspace = B, "tableName")

确保执行 rdd.cache()

当你像上面那样做时,你的 spark 正在尝试读取 rdd 数据两次。 Spark 永远不会在内存中保留任何 rdd,除非你缓存或广播它。

另一种方法是一次读取所有数据,缓存它,如果数据集不是很大的话。然后使用 groupByKey,在这种情况下,键将是您的键空间(元素)。