如何有效地从 Cassandra 读取数百万行数据?

How to effectively read millions of rows from Cassandra?

我很难从 Cassandra table 中读取数百万行。实际上这个 table 包含大约 40~50 百万行。 数据实际上是我们系统的内部 URL,我们需要触发所有这些。为了启动它,我们正在使用 Akka Streams,它一直工作得很好,根据需要做一些背压。但是我们还没有找到有效阅读所有内容的方法。

到目前为止我们已经尝试过的:

代码如下:

val cassandraRdd =
      sc
        .cassandraTable("keyspace", "my_table")
        .select("id", "url")
        .where("year = ? and month = ? and day = ?", date.getYear, date.getMonthOfYear, date.getDayOfMonth)

不幸的是,我无法遍历分区以获取更少的数据,我必须使用收集,因为它会抱怨 actor 不可序列化。

val httpPool: Flow[(HttpRequest, String), (Try[HttpResponse], String), HostConnectionPool] = Http().cachedHostConnectionPool[String](host, port).async

val source =
  Source
    .actorRef[CassandraRow](10000000, OverflowStrategy.fail)
    .map(row => makeUrl(row.getString("id"), row.getString("url")))
    .map(url => HttpRequest(uri = url) -> url)

val ref = Flow[(HttpRequest, String)]
  .via(httpPool.withAttributes(ActorAttributes.supervisionStrategy(decider)))
  .to(Sink.actorRef(httpHandlerActor, IsDone))
  .runWith(source)

cassandraRdd.collect().foreach { row =>
  ref ! row
}

我想知道你们中是否有人有过读取数百万行以执行与聚合等不同的操作的经验。

我也想过阅读所有内容并发送到 Kafka 主题,在那里我将使用 Streaming(spark 或 Akka)接收,但问题是一样的,如何有效地加载所有这些数据?

编辑

现在,我 运行 在一个具有合理内存量 100GB 的集群上,并对其进行收集和迭代。

此外,这与使用 spark 获取大数据并使用 reduceByKey、aggregateByKey 等方法分析它有很大不同

我需要通过 HTTP 获取和发送所有内容 =/

到目前为止,它按照我的方式工作,但我担心这些数据会变得越来越大,以至于将所有内容都提取到内存中毫无意义。

流式传输这些数据是最好的解决方案,分块获取,但我还没有找到好的方法。

最后,我想用 Spark 来获取所有这些数据,生成一个 CSV 文件并使用 Akka Stream IO 来处理,这样我会驱逐以在内存中保留很多东西,因为它处理每一百万需要数小时。

好吧,在花一些时间阅读、与其他人交谈并进行测试之后,结果可以通过以下代码示例实现:

val sc = new SparkContext(sparkConf)

val cassandraRdd = sc.cassandraTable(config.getString("myKeyspace"), "myTable")
  .select("key", "value")
  .as((key: String, value: String) => (key, value))
  .partitionBy(new HashPartitioner(2 * sc.defaultParallelism))
  .cache()

cassandraRdd
  .groupByKey()
  .foreachPartition { partition =>
    partition.foreach { row =>

      implicit val system = ActorSystem()
      implicit val materializer = ActorMaterializer()

      val myActor = system.actorOf(Props(new MyActor(system)), name = "my-actor")

      val source = Source.fromIterator { () => row._2.toIterator }
      source
        .map { str =>
          myActor ! Count
          str
        }
        .to(Sink.actorRef(myActor, Finish))
        .run()
    }
  }

sc.stop()


class MyActor(system: ActorSystem) extends Actor {

  var count = 0

  def receive = {

    case Count =>
      count = count + 1

    case Finish =>
      println(s"total: $count")
      system.shutdown()

  }
}

case object Count
case object Finish

我正在做的是:

  • 尝试使用 partitionBy 和 groupBy 方法获得大量的分区和分区器
  • 使用缓存来防止 Data Shuffle,让你的 Spark 跨节点移动大数据,使用高 IO 等
  • 创建整个 actor 系统及其依赖项以及 foreachPartition 方法中的 Stream。这是一个权衡,你只能有一个 ActorSystem 但你将不得不像我在问题中写的那样滥用 .collect 。然而,在内部创建所有内容,您仍然能够 运行 内部的内容在您的集群中分布。
  • 使用 Sink.actorRef 在迭代器的末尾完成每个 actor 系统并使用一条消息 kill(Finish)

也许这段代码可以进一步改进,但到目前为止我很高兴不再使用 .collect 并且只在 Spark 中工作。