谁能解释一下 Kafka/Spark 中所有这些值的含义?
Could anyone explain what all these values mean in Kafka/Spark?
我正在执行 spark 结构化 streaming/kafka,这些正在记录到控制台中,但我不确定所有这些是什么意思。
2019-08-04 15:02:08 INFO MicroBatchExecution:54 - Streaming query made progress: {
"id" : "81474cea-e15d-425b-a69e-c3fe7b8f32d0",
"runId" : "9060d86d-344d-45bb-ac20-68318e704e30",
"name" : null,
"timestamp" : "2019-08-04T22:02:08.870Z",
"batchId" : 130,
"numInputRows" : 48,
"inputRowsPerSecond" : 3692.3076923076924,
"processedRowsPerSecond" : 615.3846153846154,
"durationMs" : {
"addBatch" : 33,
"getBatch" : 2,
"getOffset" : 1,
"queryPlanning" : 3,
"triggerExecution" : 78,
"walCommit" : 38
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaSource[Subscribe[service-calls]]",
"startOffset" : {
"service-calls" : {
"0" : 17141
}
},
"endOffset" : {
"service-calls" : {
"0" : 17189
}
},
"numInputRows" : 48,
"inputRowsPerSecond" : 3692.3076923076924,
"processedRowsPerSecond" : 615.3846153846154
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleSinkProvider@2f1cf834"
}
}
2019-08-04 15:02:08 INFO MicroBatchExecution:54 - Streaming query made progress: {
"id" : "81474cea-e15d-425b-a69e-c3fe7b8f32d0",
"runId" : "9060d86d-344d-45bb-ac20-68318e704e30",
"name" : null,
"timestamp" : "2019-08-04T22:02:08.983Z",
"batchId" : 131,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"durationMs" : {
"getOffset" : 0,
"triggerExecution" : 0
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaSource[Subscribe[service-calls]]",
"startOffset" : {
"service-calls" : {
"0" : 17189
}
},
"endOffset" : {
"service-calls" : {
"0" : 17189
}
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleSinkProvider@2f1cf834"
}
}
我想我主要好奇的是
numInputRows
、inputRowsPerSecond
、startOffset
和 endOffset
。
我还添加了这个选项 option("maxOffsetsPerTrigger", 2000)
以接收每个触发器 2000 个偏移量,但我似乎找不到这个值在哪里可见。我真的收到了 2000 个补偿吗?如果没有,我如何将代理数量增加到 receive/process 条消息?
我也在使用独立模式 (local[2])。
ProgressReporter 的作用是提供一个接口,一旦实现,就可以自由地报告有关流式查询执行情况的统计信息。
其中一个实现是 org.apache.spark.sql.execution.streaming.MicroBatchExecution。
一切都从执行流式查询触发器(处理或事件时间)开始。触发器做的第一件事是调用 ProgressReporter 的 startTrigger 方法。此方法准备报告器为刚启动的统计数据累积 execution.The 报告器注册有关执行几个不同步骤的统计信息。
下一步是数据处理,记者也会收集一些统计数据。
添加这些统计信息后,ProgressReporter 调用 finishTrigger(hasNewData: Boolean)
。此方法完成触发器执行并创建包含执行统计信息的对象,这些统计信息将被放入 progressBuffer = new mutable.Queue[StreamingQueryProgress]()
。
稍后客户端可以通过 public 访问器方法(如 lastProgress() 或 recentProgress()
直接从那里检索更新(或最后一个)
关于输出:
有关触发期间 StreamingQuery 执行进度的信息。每个事件都与为流式查询的单个触发器完成的处理有关。即使没有新数据可供处理,也会发出事件。
numInputRows : The aggregate (across all sources) number of records processed in a trigger.
inputRowsPerSecond : The aggregate (across all sources) rate of data arriving.
processedRowsPerSecond :
The aggregate (across all sources) rate at which Spark is processing data.
这些自描述字段提供了有关工作绩效的关键指标
现在让我们进入来源部分
"sources" : [ {
"description" : "KafkaSource[Subscribe[service-calls]]",
"startOffset" : {
"service-calls" : {
"0" : 17189
}
},
"endOffset" : {
"service-calls" : {
"0" : 17189
}
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0 } ]
它是有关源在触发期间执行 StreamingQuery 的进度的信息。
startOffset The starting offset for data being read.
endOffset The ending offset for data being read.
numInputRows The number of records read from this source.
inputRowsPerSecond The rate at which data is arriving from this source.
processedRowsPerSecond The rate at which data from this source is being processed by Spark.
最后一部分
如您所见:
使用 maxOffsetsPerTrigger 选项限制每个 trigger.The 指定总偏移量获取的记录数将按比例拆分到不同卷的 topicPartitions。
这意味着对于每个触发器或获取进程,Kafka 将获得 2000 条记录。
但是,如果您检查日志
您在批次 130 中得到 48 行
endOffset - startOffset
= 17189 - 17141 = 48.
0 表示批次 131。
maxOffsetsPerTrigger 是一个配置,不会在 StreamingQueryProgress 中返回,因此您找不到它。
最后,如果您确实没有足够的消息可以使用,我认为增加代理的数量目前对您没有帮助。
我正在执行 spark 结构化 streaming/kafka,这些正在记录到控制台中,但我不确定所有这些是什么意思。
2019-08-04 15:02:08 INFO MicroBatchExecution:54 - Streaming query made progress: {
"id" : "81474cea-e15d-425b-a69e-c3fe7b8f32d0",
"runId" : "9060d86d-344d-45bb-ac20-68318e704e30",
"name" : null,
"timestamp" : "2019-08-04T22:02:08.870Z",
"batchId" : 130,
"numInputRows" : 48,
"inputRowsPerSecond" : 3692.3076923076924,
"processedRowsPerSecond" : 615.3846153846154,
"durationMs" : {
"addBatch" : 33,
"getBatch" : 2,
"getOffset" : 1,
"queryPlanning" : 3,
"triggerExecution" : 78,
"walCommit" : 38
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaSource[Subscribe[service-calls]]",
"startOffset" : {
"service-calls" : {
"0" : 17141
}
},
"endOffset" : {
"service-calls" : {
"0" : 17189
}
},
"numInputRows" : 48,
"inputRowsPerSecond" : 3692.3076923076924,
"processedRowsPerSecond" : 615.3846153846154
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleSinkProvider@2f1cf834"
}
}
2019-08-04 15:02:08 INFO MicroBatchExecution:54 - Streaming query made progress: {
"id" : "81474cea-e15d-425b-a69e-c3fe7b8f32d0",
"runId" : "9060d86d-344d-45bb-ac20-68318e704e30",
"name" : null,
"timestamp" : "2019-08-04T22:02:08.983Z",
"batchId" : 131,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"durationMs" : {
"getOffset" : 0,
"triggerExecution" : 0
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaSource[Subscribe[service-calls]]",
"startOffset" : {
"service-calls" : {
"0" : 17189
}
},
"endOffset" : {
"service-calls" : {
"0" : 17189
}
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleSinkProvider@2f1cf834"
}
}
我想我主要好奇的是
numInputRows
、inputRowsPerSecond
、startOffset
和 endOffset
。
我还添加了这个选项 option("maxOffsetsPerTrigger", 2000)
以接收每个触发器 2000 个偏移量,但我似乎找不到这个值在哪里可见。我真的收到了 2000 个补偿吗?如果没有,我如何将代理数量增加到 receive/process 条消息?
我也在使用独立模式 (local[2])。
ProgressReporter 的作用是提供一个接口,一旦实现,就可以自由地报告有关流式查询执行情况的统计信息。 其中一个实现是 org.apache.spark.sql.execution.streaming.MicroBatchExecution。
一切都从执行流式查询触发器(处理或事件时间)开始。触发器做的第一件事是调用 ProgressReporter 的 startTrigger 方法。此方法准备报告器为刚启动的统计数据累积 execution.The 报告器注册有关执行几个不同步骤的统计信息。
下一步是数据处理,记者也会收集一些统计数据。
添加这些统计信息后,ProgressReporter 调用 finishTrigger(hasNewData: Boolean)
。此方法完成触发器执行并创建包含执行统计信息的对象,这些统计信息将被放入 progressBuffer = new mutable.Queue[StreamingQueryProgress]()
。
稍后客户端可以通过 public 访问器方法(如 lastProgress() 或 recentProgress()
直接从那里检索更新(或最后一个)关于输出:
有关触发期间 StreamingQuery 执行进度的信息。每个事件都与为流式查询的单个触发器完成的处理有关。即使没有新数据可供处理,也会发出事件。
numInputRows : The aggregate (across all sources) number of records processed in a trigger.
inputRowsPerSecond : The aggregate (across all sources) rate of data arriving.
processedRowsPerSecond :
The aggregate (across all sources) rate at which Spark is processing data.
这些自描述字段提供了有关工作绩效的关键指标
现在让我们进入来源部分
"sources" : [ {
"description" : "KafkaSource[Subscribe[service-calls]]",
"startOffset" : {
"service-calls" : {
"0" : 17189
}
},
"endOffset" : {
"service-calls" : {
"0" : 17189
}
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0 } ]
它是有关源在触发期间执行 StreamingQuery 的进度的信息。
startOffset The starting offset for data being read.
endOffset The ending offset for data being read.
numInputRows The number of records read from this source.
inputRowsPerSecond The rate at which data is arriving from this source.
processedRowsPerSecond The rate at which data from this source is being processed by Spark.
最后一部分
如您所见:
使用 maxOffsetsPerTrigger 选项限制每个 trigger.The 指定总偏移量获取的记录数将按比例拆分到不同卷的 topicPartitions。
这意味着对于每个触发器或获取进程,Kafka 将获得 2000 条记录。 但是,如果您检查日志 您在批次 130 中得到 48 行 endOffset - startOffset = 17189 - 17141 = 48.
0 表示批次 131。
maxOffsetsPerTrigger 是一个配置,不会在 StreamingQueryProgress 中返回,因此您找不到它。
最后,如果您确实没有足够的消息可以使用,我认为增加代理的数量目前对您没有帮助。