火花流中的批处理时间和提交时间相差 50 分钟
50 minute difference between batch time and submission time in spark streaming
spark 版本为 2.2.0
伪代码:
用 5 分钟从 kafka 读取数据 1 window
从 kafka 读取 data2 10 分钟 window 和 5 分钟幻灯片持续时间
data1 在某些条件下加入 data2
做一些聚合并写信给mysql
问题:
批处理时间为 15:00 但提交时间为 15:50,处理时间不到 1 分钟。发生什么事了?
val shareDs = KafkaUtils.createDirectStream[String, String](streamContext, LocationStrategies.PreferBrokers, shareReqConsumer)
val shareResDS = KafkaUtils.createDirectStream[String, String](streamContext, LocationStrategies.PreferBrokers, shareResConsumer).window(Minutes(WindowTime), Minutes(StreamTime))
shareDs doSomeMap join (shareResDs doSomeMap) forEachRddd{do some things then write to mysql}
有一些日志:
19/07/22 11:20:00 INFO dstream.MappedDStream: Slicing from 1563765000000 ms to 1563765600000 ms (aligned to 1563765000000 ms and 1563765600000 ms)
19/07/22 11:20:00 INFO dstream.MappedDStream: Slicing from 1563765000000 ms to 1563765600000 ms (aligned to 1563765000000 ms and 1563765600000 ms)
19/07/22 11:20:00 INFO dstream.MappedDStream: Slicing from 1563765000000 ms to 1563765600000 ms (aligned to 1563765000000 ms and 1563765600000 ms)
19/07/22 11:20:00 INFO internals.ConsumerCoordinator: [Consumer clientId=consumer-6, groupId=dashboard] Revoking previously assigned partitions [topic_wh_sparkstream_afp_com_input_result-2, topic_wh_sparkstream_afp_com_input_result-1, topic_wh_sparkstream_afp_com_input_result-0]
19/07/22 11:20:00 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-6, groupId=dashboard] (Re-)joining group
19/07/22 11:25:00 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-6, groupId=dashboard] Successfully joined group with generation 820
19/07/22 11:25:00 INFO internals.ConsumerCoordinator: [Consumer clientId=consumer-6, groupId=dashboard] Setting newly assigned partitions [topic_wh_sparkstream_afp_com_input_result-2, topic_wh_sparkstream_afp_com_input_result-1, topic_wh_sparkstream_afp_com_input_result-0]
19/07/22 11:25:00 INFO dstream.MappedDStream: Slicing from 1563765000000 ms to 1563765600000 ms (aligned to 1563765000000 ms and 1563765600000 ms)
19/07/22 11:25:00 INFO dstream.MappedDStream: Slicing from 1563765000000 ms to 1563765600000 ms (aligned to 1563765000000 ms and 1563765600000 ms)
19/07/22 11:25:00 INFO internals.ConsumerCoordinator: [Consumer clientId=consumer-5, groupId=dashboard] Revoking previously assigned partitions [topic_wh_sparkstream_decision_report_result-1, topic_wh_sparkstream_decision_report_result-2, topic_wh_sparkstream_decision_report_result-0]
19/07/22 11:25:00 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-5, groupId=dashboard] (Re-)joining group
19/07/22 11:30:00 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-5, groupId=dashboard] Successfully joined group with generation 821
19/07/22 11:30:00 INFO internals.ConsumerCoordinator: [Consumer clientId=consumer-5, groupId=dashboard] Setting newly assigned partitions [topic_wh_sparkstream_decision_report_result-1, topic_wh_sparkstream_decision_report_result-2, topic_wh_sparkstream_decision_report_result-0]
19/07/22 11:30:00 INFO internals.ConsumerCoordinator: [Consumer clientId=consumer-4, groupId=dashboard] Revoking previously assigned partitions [topic_wh_sparkstream_echo_mixed_risk_record-1, topic_wh_sparkstream_echo_mixed_risk_record-2, topic_wh_sparkstream_echo_mixed_risk_record-0]
19/07/22 11:30:00 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-4, groupId=dashboard] (Re-)joining group
19/07/22 11:30:00 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-4, groupId=dashboard] Marking the coordinator 10.124.35.112:9092 (id: 2147483534 rack: null) dead
19/07/22 11:30:00 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-4, groupId=dashboard] Discovered group coordinator 10.124.35.112:9092 (id: 2147483534 rack: null)
19/07/22 11:30:00 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-4, groupId=dashboard] (Re-)joining group
19/07/22 11:35:00 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-4, groupId=dashboard] Successfully joined group with generation 822
19/07/22 11:35:00 INFO internals.ConsumerCoordinator: [Consumer clientId=consumer-4, groupId=dashboard] Setting newly assigned partitions [topic_wh_sparkstream_echo_mixed_risk_record-1, topic_wh_sparkstream_echo_mixed_risk_record-2, topic_wh_sparkstream_echo_mixed_risk_record-0]
19/07/22 11:35:00 INFO dstream.MappedDStream: Slicing from 1563765000000 ms to 1563765600000 ms (aligned to 1563765000000 ms and 1563765600000 ms)
19/07/22 11:35:00 INFO internals.ConsumerCoordinator: [Consumer clientId=consumer-3, groupId=dashboard] Revoking previously assigned partitions [topic_wh_sparkstream_echo_mixed_risk_result_detail-2, topic_wh_sparkstream_echo_mixed_risk_result_detail-1, topic_wh_sparkstream_echo_mixed_risk_result_detail-0, topic_wh_sparkstream_echo_behavior_features_result-0, topic_wh_sparkstream_echo_behavior_features_result-1, topic_wh_sparkstream_echo_behavior_features_result-2]
19/07/22 11:35:00 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-3, groupId=dashboard] (Re-)joining group
19/07/22 11:35:00 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-3, groupId=dashboard] Marking the coordinator 10.124.35.112:9092 (id: 2147483534 rack: null) dead
19/07/22 11:35:00 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-3, groupId=dashboard] Discovered group coordinator 10.124.35.112:9092 (id: 2147483534 rack: null)
19/07/22 11:35:00 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-3, groupId=dashboard] (Re-)joining group
在 window 时间戳,只做 kafka 重新分区而不是添加作业。
我解决了这个问题。
将 spark-Streaming 与 kafka 一起使用,用单独的 group_id 配置每个流并禁用自动提交,
适当配置kafka参数。特别是心跳、会话超时、请求超时、最大轮询间隔。
spark 版本为 2.2.0 伪代码:
用 5 分钟从 kafka 读取数据 1 window
从 kafka 读取 data2 10 分钟 window 和 5 分钟幻灯片持续时间
data1 在某些条件下加入 data2
做一些聚合并写信给mysql
问题: 批处理时间为 15:00 但提交时间为 15:50,处理时间不到 1 分钟。发生什么事了?
val shareDs = KafkaUtils.createDirectStream[String, String](streamContext, LocationStrategies.PreferBrokers, shareReqConsumer)
val shareResDS = KafkaUtils.createDirectStream[String, String](streamContext, LocationStrategies.PreferBrokers, shareResConsumer).window(Minutes(WindowTime), Minutes(StreamTime))
shareDs doSomeMap join (shareResDs doSomeMap) forEachRddd{do some things then write to mysql}
有一些日志:
19/07/22 11:20:00 INFO dstream.MappedDStream: Slicing from 1563765000000 ms to 1563765600000 ms (aligned to 1563765000000 ms and 1563765600000 ms)
19/07/22 11:20:00 INFO dstream.MappedDStream: Slicing from 1563765000000 ms to 1563765600000 ms (aligned to 1563765000000 ms and 1563765600000 ms)
19/07/22 11:20:00 INFO dstream.MappedDStream: Slicing from 1563765000000 ms to 1563765600000 ms (aligned to 1563765000000 ms and 1563765600000 ms)
19/07/22 11:20:00 INFO internals.ConsumerCoordinator: [Consumer clientId=consumer-6, groupId=dashboard] Revoking previously assigned partitions [topic_wh_sparkstream_afp_com_input_result-2, topic_wh_sparkstream_afp_com_input_result-1, topic_wh_sparkstream_afp_com_input_result-0]
19/07/22 11:20:00 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-6, groupId=dashboard] (Re-)joining group
19/07/22 11:25:00 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-6, groupId=dashboard] Successfully joined group with generation 820
19/07/22 11:25:00 INFO internals.ConsumerCoordinator: [Consumer clientId=consumer-6, groupId=dashboard] Setting newly assigned partitions [topic_wh_sparkstream_afp_com_input_result-2, topic_wh_sparkstream_afp_com_input_result-1, topic_wh_sparkstream_afp_com_input_result-0]
19/07/22 11:25:00 INFO dstream.MappedDStream: Slicing from 1563765000000 ms to 1563765600000 ms (aligned to 1563765000000 ms and 1563765600000 ms)
19/07/22 11:25:00 INFO dstream.MappedDStream: Slicing from 1563765000000 ms to 1563765600000 ms (aligned to 1563765000000 ms and 1563765600000 ms)
19/07/22 11:25:00 INFO internals.ConsumerCoordinator: [Consumer clientId=consumer-5, groupId=dashboard] Revoking previously assigned partitions [topic_wh_sparkstream_decision_report_result-1, topic_wh_sparkstream_decision_report_result-2, topic_wh_sparkstream_decision_report_result-0]
19/07/22 11:25:00 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-5, groupId=dashboard] (Re-)joining group
19/07/22 11:30:00 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-5, groupId=dashboard] Successfully joined group with generation 821
19/07/22 11:30:00 INFO internals.ConsumerCoordinator: [Consumer clientId=consumer-5, groupId=dashboard] Setting newly assigned partitions [topic_wh_sparkstream_decision_report_result-1, topic_wh_sparkstream_decision_report_result-2, topic_wh_sparkstream_decision_report_result-0]
19/07/22 11:30:00 INFO internals.ConsumerCoordinator: [Consumer clientId=consumer-4, groupId=dashboard] Revoking previously assigned partitions [topic_wh_sparkstream_echo_mixed_risk_record-1, topic_wh_sparkstream_echo_mixed_risk_record-2, topic_wh_sparkstream_echo_mixed_risk_record-0]
19/07/22 11:30:00 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-4, groupId=dashboard] (Re-)joining group
19/07/22 11:30:00 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-4, groupId=dashboard] Marking the coordinator 10.124.35.112:9092 (id: 2147483534 rack: null) dead
19/07/22 11:30:00 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-4, groupId=dashboard] Discovered group coordinator 10.124.35.112:9092 (id: 2147483534 rack: null)
19/07/22 11:30:00 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-4, groupId=dashboard] (Re-)joining group
19/07/22 11:35:00 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-4, groupId=dashboard] Successfully joined group with generation 822
19/07/22 11:35:00 INFO internals.ConsumerCoordinator: [Consumer clientId=consumer-4, groupId=dashboard] Setting newly assigned partitions [topic_wh_sparkstream_echo_mixed_risk_record-1, topic_wh_sparkstream_echo_mixed_risk_record-2, topic_wh_sparkstream_echo_mixed_risk_record-0]
19/07/22 11:35:00 INFO dstream.MappedDStream: Slicing from 1563765000000 ms to 1563765600000 ms (aligned to 1563765000000 ms and 1563765600000 ms)
19/07/22 11:35:00 INFO internals.ConsumerCoordinator: [Consumer clientId=consumer-3, groupId=dashboard] Revoking previously assigned partitions [topic_wh_sparkstream_echo_mixed_risk_result_detail-2, topic_wh_sparkstream_echo_mixed_risk_result_detail-1, topic_wh_sparkstream_echo_mixed_risk_result_detail-0, topic_wh_sparkstream_echo_behavior_features_result-0, topic_wh_sparkstream_echo_behavior_features_result-1, topic_wh_sparkstream_echo_behavior_features_result-2]
19/07/22 11:35:00 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-3, groupId=dashboard] (Re-)joining group
19/07/22 11:35:00 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-3, groupId=dashboard] Marking the coordinator 10.124.35.112:9092 (id: 2147483534 rack: null) dead
19/07/22 11:35:00 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-3, groupId=dashboard] Discovered group coordinator 10.124.35.112:9092 (id: 2147483534 rack: null)
19/07/22 11:35:00 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-3, groupId=dashboard] (Re-)joining group
在 window 时间戳,只做 kafka 重新分区而不是添加作业。
我解决了这个问题。 将 spark-Streaming 与 kafka 一起使用,用单独的 group_id 配置每个流并禁用自动提交, 适当配置kafka参数。特别是心跳、会话超时、请求超时、最大轮询间隔。