用于 joinWithCassandraTable 的分区键检索
Partition keys retrieval for usage with joinWithCassandraTable
我有关注 Cassandra table:
CREATE TABLE listener.snapshots_geohash
(
created_date text, -- date when record have come to the system
geo_part text, -- few signs of geo hash - just for partitioning
when timestamp, -- record creation date
device_id text, -- id of device produced json data (see snapshot column)
snapshot text, -- json data, should be aggregated by spark
PRIMARY KEY ((created_date, geo_part), when, device_id)
)
每天早上聚合应用程序应加载前一天的数据并从快照列聚合 JSON 数据。聚合将按 geohash 对数据进行分组,这就是为什么它的部分 selected 成为分区键的一部分。
我知道使用 joinWithCassandraTable 从 Cassandra 加载数据是有效的 - 但为此我必须从 (created_date, geo_part) 对构造 RDD。虽然我知道 created_date 值,但我无法列出 geo_part 值 - 因为它只是 geohash 的一部分并且它的值不是连续的。所以我不得不以某种方式 运行 select distinct created_date, geo_part from ks.snapshots
并根据其结果创建 RDD。问题是如何 运行 这个 select 与 spark 2.0.2 和 cassandra-connector 2.0.0-M3 或者可能有其他方法?
我找到了通过 运行 使用 CassandraConnector 进行 CQL 查询来获取 Cassandra 分区键的方法:
val cassandraConnector = CassandraConnector(spark.sparkContext.getConf)
val distinctRows = cassandraConnector.withSessionDo(session => {
session.execute(s"select distinct created_date, geo_part from ${keyspace}.$snapshots_table")
}).all().map(row => {TableKeyM(row.getString("created_date"), row.getString("geo_part"))}).filter(k => {days.contains(k.created_date)})
val data_x = spark.sparkContext.parallelize(distinctRows)
table结构设计存在以下问题:Cassandra不允许添加WHERE created_date='...'子句到 select 不同 created_date、geo_part 并且需要获取整个对列表并在应用程序中对其进行过滤。
替代解决方案可能是使分区键连续。如果聚合按小时完成 - 那么分区键可以是 (created_date, hour) 并且可以在应用程序中列出 24 小时。如果每天 24 个分区不够,并且聚合有 group by by by geohash,则可以坚持使用 geohash 显着部分——但它应该被翻译成一些 countable - 例如 geoPart.hash() % desiredNumberOfSubpartitions
val keys = sc.cassandraTable("listener","snapshots_geohash").select("created_date","geo_part").perPartitionLimit(1)
有关完整说明,请参阅 。
我有关注 Cassandra table:
CREATE TABLE listener.snapshots_geohash
(
created_date text, -- date when record have come to the system
geo_part text, -- few signs of geo hash - just for partitioning
when timestamp, -- record creation date
device_id text, -- id of device produced json data (see snapshot column)
snapshot text, -- json data, should be aggregated by spark
PRIMARY KEY ((created_date, geo_part), when, device_id)
)
每天早上聚合应用程序应加载前一天的数据并从快照列聚合 JSON 数据。聚合将按 geohash 对数据进行分组,这就是为什么它的部分 selected 成为分区键的一部分。
我知道使用 joinWithCassandraTable 从 Cassandra 加载数据是有效的 - 但为此我必须从 (created_date, geo_part) 对构造 RDD。虽然我知道 created_date 值,但我无法列出 geo_part 值 - 因为它只是 geohash 的一部分并且它的值不是连续的。所以我不得不以某种方式 运行 select distinct created_date, geo_part from ks.snapshots
并根据其结果创建 RDD。问题是如何 运行 这个 select 与 spark 2.0.2 和 cassandra-connector 2.0.0-M3 或者可能有其他方法?
我找到了通过 运行 使用 CassandraConnector 进行 CQL 查询来获取 Cassandra 分区键的方法:
val cassandraConnector = CassandraConnector(spark.sparkContext.getConf)
val distinctRows = cassandraConnector.withSessionDo(session => {
session.execute(s"select distinct created_date, geo_part from ${keyspace}.$snapshots_table")
}).all().map(row => {TableKeyM(row.getString("created_date"), row.getString("geo_part"))}).filter(k => {days.contains(k.created_date)})
val data_x = spark.sparkContext.parallelize(distinctRows)
table结构设计存在以下问题:Cassandra不允许添加WHERE created_date='...'子句到 select 不同 created_date、geo_part 并且需要获取整个对列表并在应用程序中对其进行过滤。
替代解决方案可能是使分区键连续。如果聚合按小时完成 - 那么分区键可以是 (created_date, hour) 并且可以在应用程序中列出 24 小时。如果每天 24 个分区不够,并且聚合有 group by by by geohash,则可以坚持使用 geohash 显着部分——但它应该被翻译成一些 countable - 例如 geoPart.hash() % desiredNumberOfSubpartitions
val keys = sc.cassandraTable("listener","snapshots_geohash").select("created_date","geo_part").perPartitionLimit(1)
有关完整说明,请参阅