"row +: savedState.toSeq" 在 StateStoreRestoreExec.doExecute 中做什么?
What does "row +: savedState.toSeq" do in StateStoreRestoreExec.doExecute?
我们可以看到如下StateStoreRestoreExec
case class StateStoreRestoreExec(
keyExpressions: Seq[Attribute],
stateId: Option[OperatorStateId],
child: SparkPlan)
extends UnaryExecNode with StateStoreReader {
override protected def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
child.execute().mapPartitionsWithStateStore(
getStateId.checkpointLocation,
operatorId = getStateId.operatorId,
storeVersion = getStateId.batchId,
keyExpressions.toStructType,
child.output.toStructType,
sqlContext.sessionState,
Some(sqlContext.streams.stateStoreCoordinator)) { case (store, iter) =>
val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output)
iter.flatMap { row =>
val key = getKey(row)
val savedState = store.get(key)
numOutputRows += 1
row +: savedState.toSeq
}
}
在这里,我想知道 row +: savedState.toSeq
的含义。我认为 row 是 UnsafeRow 的一个实例, savedState.toSeq 是 Seq 的一个实例。那么我们如何用+:
来操作它们呢?另一方面,我认为 savedState 是 UnsafeRow 的实例并且 toSeq 不是 UnsaveRow 的成员,那么 savedState.toSeq
是如何工作的?
row
是 InternalRow
的一个实例,savedState
是 Option[UnsafeRow]
的一个实例,它扩展了 InternalRow
。这里发生的是保存状态从 Option[UnsafeRow]
转换为 Seq[UnsafeRow]
,然后 row
实例被添加到该序列之前。
当您 flatMap
在这些 UnsafeRow
对象上时,您会得到一个 Iterator[UnsafeRow]
。
我们可以看到如下StateStoreRestoreExec
case class StateStoreRestoreExec(
keyExpressions: Seq[Attribute],
stateId: Option[OperatorStateId],
child: SparkPlan)
extends UnaryExecNode with StateStoreReader {
override protected def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
child.execute().mapPartitionsWithStateStore(
getStateId.checkpointLocation,
operatorId = getStateId.operatorId,
storeVersion = getStateId.batchId,
keyExpressions.toStructType,
child.output.toStructType,
sqlContext.sessionState,
Some(sqlContext.streams.stateStoreCoordinator)) { case (store, iter) =>
val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output)
iter.flatMap { row =>
val key = getKey(row)
val savedState = store.get(key)
numOutputRows += 1
row +: savedState.toSeq
}
}
在这里,我想知道 row +: savedState.toSeq
的含义。我认为 row 是 UnsafeRow 的一个实例, savedState.toSeq 是 Seq 的一个实例。那么我们如何用+:
来操作它们呢?另一方面,我认为 savedState 是 UnsafeRow 的实例并且 toSeq 不是 UnsaveRow 的成员,那么 savedState.toSeq
是如何工作的?
row
是 InternalRow
的一个实例,savedState
是 Option[UnsafeRow]
的一个实例,它扩展了 InternalRow
。这里发生的是保存状态从 Option[UnsafeRow]
转换为 Seq[UnsafeRow]
,然后 row
实例被添加到该序列之前。
当您 flatMap
在这些 UnsafeRow
对象上时,您会得到一个 Iterator[UnsafeRow]
。