使用 Spark Streaming 延迟执行数据库写入
DB writes are executed lazily with Spark Streaming
当 Spark Streaming 处于 运行 时,HBase put 不会执行,只有当我关闭 Spark 时 - 它会尝试完全执行所有 put
val inputRdd = FlumeUtils.createStream(ssc, "server", 44444)
inputRdd.foreachRDD({ rdd =>
rdd.foreachPartition(partitionOfRecords => {
val hbaseClient = new HBaseClient(zookeeper)
partitionOfRecords.foreach({ event =>
hbaseClient.put(parse(event))
hbaseClient.flush()
好的 - 我找到了我的答案 - 显然我的代码是正确的,问题是我没有留下足够的线程来处理数据
来自
http://spark.apache.org/docs/latest/streaming-programming-guide.html
"""
如果您使用基于接收器(例如套接字、Kafka、Flume 等)的输入 DStream,则单线程将用于 运行 接收器,而不会留下任何线程来处理接收到的数据数据。因此,当 运行 在本地时,始终使用“local[n]”作为主节点 URL,其中 n > 运行 的接收者数量(有关如何设置的信息,请参阅 Spark 属性大师)。
"""
使用本地[*]解决了问题
当 Spark Streaming 处于 运行 时,HBase put 不会执行,只有当我关闭 Spark 时 - 它会尝试完全执行所有 put
val inputRdd = FlumeUtils.createStream(ssc, "server", 44444)
inputRdd.foreachRDD({ rdd =>
rdd.foreachPartition(partitionOfRecords => {
val hbaseClient = new HBaseClient(zookeeper)
partitionOfRecords.foreach({ event =>
hbaseClient.put(parse(event))
hbaseClient.flush()
好的 - 我找到了我的答案 - 显然我的代码是正确的,问题是我没有留下足够的线程来处理数据
来自 http://spark.apache.org/docs/latest/streaming-programming-guide.html """ 如果您使用基于接收器(例如套接字、Kafka、Flume 等)的输入 DStream,则单线程将用于 运行 接收器,而不会留下任何线程来处理接收到的数据数据。因此,当 运行 在本地时,始终使用“local[n]”作为主节点 URL,其中 n > 运行 的接收者数量(有关如何设置的信息,请参阅 Spark 属性大师)。 """
使用本地[*]解决了问题