DStreams:在 foreachRDD 中创建然后在 foreachPartition 内部修改的变量在 foreachPartition 外部重置一次?
DStreams: Variable created within foreachRDD and then modified inside foreachPartition is reset once outside of foreachPartition?
我在 kafka 中有一堆消息并使用 spark streaming 来处理这些消息。
我试图在我的代码无法插入到我的数据库中时进行捕捉,然后获取这些消息并将它们插入回 Kafka,以便我稍后可以处理它们。
为了解决这个问题,我在我的 foreachRDD 函数中创建了一个名为 "success" 的变量。然后,当我尝试更新到数据库时,我 return 一个用于成功插入的布尔值。我在测试期间注意到的是,当我尝试在我的 foreachPartition 期间插入时,这似乎效果不佳。当我离开 foreachPartition 函数时,成功值似乎得到 "reset" 。
stream: DStream[String]
stream
.foreachRDD(rdd => {
if (!rdd.isEmpty()) {
var success = true
rdd.foreachPartition(partitionOfRecords => {
if (partitionOfRecords.nonEmpty) {
val listOfRecords = partitionOfRecords.toList
val successfulInsert: Boolean = insertRecordsToDB(listOfRecords)
logger.info("Insert was successful: " + successfulInsert)
if (!successfulInsert) {
logger.info("logging successful as false. Currently its set to: " + success )
success = false
logger.info("logged successful as false. Currently its set to: " + success )
}
}
})
logger.info("Insert into database successful from all partition: " + success)
if (!success) {
// send data to Kafka topic
}
}
})
然后我的日志输出显示了这个!
2019-06-24 20:26:37 [INFO] 插入成功:false
2019-06-24 20:26:37 [INFO] 记录成功为假。目前其设置为:true
2019-06-24 20:26:37 [INFO] 记录成功为假。目前其设置为:false
2019-06-24 20:26:37 [INFO] 从所有分区成功插入数据库:true
即使在第 3 个日志中它说当前 "success" 设置为 false,然后当我离开 foreachPartition 时,我再次记录它并将其设置回 true。
谁能解释一下为什么?或者建议不同的方法?
我能够使用累加器让它工作。
stream: DStream[String]
val dbInsertACC = sparkSession.sparkContext.longAccumulator("insertSuccess")
stream
.foreachRDD(rdd => {
if (!rdd.isEmpty()) {
//could maybe put accumulator here?
rdd.foreachPartition(partitionOfRecords => {
if (partitionOfRecords.nonEmpty) {
val listOfRecords = partitionOfRecords.toList
val successfulInsert: Boolean = insertRecordsToDB(listOfRecords)
logger.info("Insert was successful: " + successfulInsert)
if (!successfulInsert) dbInsertACC.add(1)
}
})
logger.info("Insert into database successful from all partition: " + success)
if (!dbInsertACC.isZero) {
// send data to Kafka topic
}
}
})
我在 kafka 中有一堆消息并使用 spark streaming 来处理这些消息。
我试图在我的代码无法插入到我的数据库中时进行捕捉,然后获取这些消息并将它们插入回 Kafka,以便我稍后可以处理它们。
为了解决这个问题,我在我的 foreachRDD 函数中创建了一个名为 "success" 的变量。然后,当我尝试更新到数据库时,我 return 一个用于成功插入的布尔值。我在测试期间注意到的是,当我尝试在我的 foreachPartition 期间插入时,这似乎效果不佳。当我离开 foreachPartition 函数时,成功值似乎得到 "reset" 。
stream: DStream[String]
stream
.foreachRDD(rdd => {
if (!rdd.isEmpty()) {
var success = true
rdd.foreachPartition(partitionOfRecords => {
if (partitionOfRecords.nonEmpty) {
val listOfRecords = partitionOfRecords.toList
val successfulInsert: Boolean = insertRecordsToDB(listOfRecords)
logger.info("Insert was successful: " + successfulInsert)
if (!successfulInsert) {
logger.info("logging successful as false. Currently its set to: " + success )
success = false
logger.info("logged successful as false. Currently its set to: " + success )
}
}
})
logger.info("Insert into database successful from all partition: " + success)
if (!success) {
// send data to Kafka topic
}
}
})
然后我的日志输出显示了这个!
2019-06-24 20:26:37 [INFO] 插入成功:false 2019-06-24 20:26:37 [INFO] 记录成功为假。目前其设置为:true 2019-06-24 20:26:37 [INFO] 记录成功为假。目前其设置为:false 2019-06-24 20:26:37 [INFO] 从所有分区成功插入数据库:true
即使在第 3 个日志中它说当前 "success" 设置为 false,然后当我离开 foreachPartition 时,我再次记录它并将其设置回 true。
谁能解释一下为什么?或者建议不同的方法?
我能够使用累加器让它工作。
stream: DStream[String]
val dbInsertACC = sparkSession.sparkContext.longAccumulator("insertSuccess")
stream
.foreachRDD(rdd => {
if (!rdd.isEmpty()) {
//could maybe put accumulator here?
rdd.foreachPartition(partitionOfRecords => {
if (partitionOfRecords.nonEmpty) {
val listOfRecords = partitionOfRecords.toList
val successfulInsert: Boolean = insertRecordsToDB(listOfRecords)
logger.info("Insert was successful: " + successfulInsert)
if (!successfulInsert) dbInsertACC.add(1)
}
})
logger.info("Insert into database successful from all partition: " + success)
if (!dbInsertACC.isZero) {
// send data to Kafka topic
}
}
})