如果在 Spark Streaming 中
If else in Spark Streaming
我有一个 Spark Streaming 应用程序,它从 Kafka 中的 SINGLE TOPIC 读取数据,对其进行处理,然后根据元素的内容将其插入到 Cassandra 中的 2 个不同的键空间中。一些数据可能进入键空间 A,另一些数据进入键空间 B。
我目前使用的是过滤操作:
Functions.insertToCassandra(rdd.filter(element => element.tenant=="A"), keyspace = A, table = "tableName")
Functions.insertToCassandra(rdd.filter(element => element.tenant=="B"), keyspace = B, table = "tableName")
因此过滤器应用于每个rdd,具有租户字段A的元素进入键空间A,具有租户字段B的元素进入键空间B。
有没有更有效的方法来做到这一点,而不是应用过滤操作 2 次(特别是因为以后可能会有超过 2 个键空间)?在过滤操作之前缓存 rdd 会提高性能吗?
我重复一遍,我有来自 Kafka 的 DStream,我处理它,然后在 "foreachRDD" 操作中我有上面的代码片段,它正在向 Cassandra 插入数据。
谢谢
在你做之前
Functions.insertToCassandra(rdd.filter(element => element.tenant=="A"), keyspace = A, "tableName")
Functions.insertToCassandra(rdd.filter(element=> element.tenant=="B"), keyspace = B, "tableName")
确保执行 rdd.cache()
当你像上面那样做时,你的 spark 正在尝试读取 rdd 数据两次。
Spark 永远不会在内存中保留任何 rdd,除非你缓存或广播它。
另一种方法是一次读取所有数据,缓存它,如果数据集不是很大的话。然后使用 groupByKey,在这种情况下,键将是您的键空间(元素)。
我有一个 Spark Streaming 应用程序,它从 Kafka 中的 SINGLE TOPIC 读取数据,对其进行处理,然后根据元素的内容将其插入到 Cassandra 中的 2 个不同的键空间中。一些数据可能进入键空间 A,另一些数据进入键空间 B。
我目前使用的是过滤操作:
Functions.insertToCassandra(rdd.filter(element => element.tenant=="A"), keyspace = A, table = "tableName")
Functions.insertToCassandra(rdd.filter(element => element.tenant=="B"), keyspace = B, table = "tableName")
因此过滤器应用于每个rdd,具有租户字段A的元素进入键空间A,具有租户字段B的元素进入键空间B。
有没有更有效的方法来做到这一点,而不是应用过滤操作 2 次(特别是因为以后可能会有超过 2 个键空间)?在过滤操作之前缓存 rdd 会提高性能吗?
我重复一遍,我有来自 Kafka 的 DStream,我处理它,然后在 "foreachRDD" 操作中我有上面的代码片段,它正在向 Cassandra 插入数据。
谢谢
在你做之前
Functions.insertToCassandra(rdd.filter(element => element.tenant=="A"), keyspace = A, "tableName")
Functions.insertToCassandra(rdd.filter(element=> element.tenant=="B"), keyspace = B, "tableName")
确保执行 rdd.cache()
当你像上面那样做时,你的 spark 正在尝试读取 rdd 数据两次。 Spark 永远不会在内存中保留任何 rdd,除非你缓存或广播它。
另一种方法是一次读取所有数据,缓存它,如果数据集不是很大的话。然后使用 groupByKey,在这种情况下,键将是您的键空间(元素)。