Spark Dataframe leftanti 加入失败
Spark Dataframe leftanti Join Fails
我们正在尝试将增量从 Hive table 发布到 Kafka。所讨论的 table 是一个单分区、单块文件,大小为 244 MB。我们的集群配置为 256M 的块大小,因此在这种情况下,我们几乎达到了单个文件的最大值。
每次 table 更新时,都会存档一个副本,然后我们 运行 我们的增量过程。
在下面的函数中,我们隔离了不同的连接,并确认内部连接的性能可以接受(大约 3 分钟),但是两个反连接数据帧将不会完成——我们不断地向 Spark 作业投入更多资源,但继续看到以下错误。
这种连接的数据帧大小是否有实际限制?
private class DeltaColumnPublisher(spark: SparkSession, sink: KafkaSink, source: RegisteredDataset)
extends BasePublisher(spark, sink, source) with Serializable {
val deltaColumn = "hadoop_update_ts" // TODO: move to the dataset object
def publishDeltaRun(dataLocation: String, archiveLocation: String): (Long, Long) = {
val current = spark.read.parquet(dataLocation)
val previous = spark.read.parquet(archiveLocation)
val inserts = current.join(previous, keys, "leftanti")
val updates = current.join(previous, keys).where(current.col(deltaColumn) =!= previous.col(deltaColumn))
val deletes = previous.join(current, keys, "leftanti")
val upsertCounter = spark.sparkContext.longAccumulator("upserts")
val deleteCounter = spark.sparkContext.longAccumulator("deletes")
logInfo("sending inserts to kafka")
sink.sendDeltasToKafka(inserts, "U", upsertCounter)
logInfo("sending updates to kafka")
sink.sendDeltasToKafka(updates, "U", upsertCounter)
logInfo("sending deletes to kafka")
sink.sendDeltasToKafka(deletes, "D", deleteCounter)
(upsertCounter.value, deleteCounter.value)
}
}
我们看到的错误似乎表明驱动程序正在与执行者失去联系。我们将执行器内存增加到24G,网络超时高达900s,心跳间隔高达120s
17/11/27 20:36:18 WARN netty.NettyRpcEndpointRef: Error sending message [message = Heartbeat(1,[Lscala.Tuple2;@596e3aa6,BlockManagerId(1, server, 46292, None))] in 2 attempts
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. This timeout is controlled by spark.executor.heartbeatInterval
at ...
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [120 seconds]
at ...
稍后在日志中:
17/11/27 20:42:37 WARN netty.NettyRpcEndpointRef: Error sending message [message = Heartbeat(1,[Lscala.Tuple2;@25d1bd5f,BlockManagerId(1, server, 46292, None))] in 3 attempts
org.apache.spark.SparkException: Exception thrown in awaitResult
at ...
Caused by: java.lang.RuntimeException: org.apache.spark.SparkException: Could not find HeartbeatReceiver.
我们一直在操作(未成功)的配置开关是 --executor-memory 24G --conf spark.network.timeout=900s --conf spark.executor.heartbeatInterval=120s
我没有考虑的选项是增加我的驱动程序资源。我添加了 --driver-memory 4G
和 --driver-cores 2
并在大约 9 分钟内完成了我的工作。
这两个文件的内部连接(或使用内置的 except()
方法)似乎给执行程序带来了内存压力。对其中一个关键列进行分区似乎有助于缓解内存压力,但会增加总体时间,因为涉及更多的改组。
做这两个文件的左反连接需要我们有更多的驱动资源。没想到。
我们正在尝试将增量从 Hive table 发布到 Kafka。所讨论的 table 是一个单分区、单块文件,大小为 244 MB。我们的集群配置为 256M 的块大小,因此在这种情况下,我们几乎达到了单个文件的最大值。
每次 table 更新时,都会存档一个副本,然后我们 运行 我们的增量过程。
在下面的函数中,我们隔离了不同的连接,并确认内部连接的性能可以接受(大约 3 分钟),但是两个反连接数据帧将不会完成——我们不断地向 Spark 作业投入更多资源,但继续看到以下错误。
这种连接的数据帧大小是否有实际限制?
private class DeltaColumnPublisher(spark: SparkSession, sink: KafkaSink, source: RegisteredDataset)
extends BasePublisher(spark, sink, source) with Serializable {
val deltaColumn = "hadoop_update_ts" // TODO: move to the dataset object
def publishDeltaRun(dataLocation: String, archiveLocation: String): (Long, Long) = {
val current = spark.read.parquet(dataLocation)
val previous = spark.read.parquet(archiveLocation)
val inserts = current.join(previous, keys, "leftanti")
val updates = current.join(previous, keys).where(current.col(deltaColumn) =!= previous.col(deltaColumn))
val deletes = previous.join(current, keys, "leftanti")
val upsertCounter = spark.sparkContext.longAccumulator("upserts")
val deleteCounter = spark.sparkContext.longAccumulator("deletes")
logInfo("sending inserts to kafka")
sink.sendDeltasToKafka(inserts, "U", upsertCounter)
logInfo("sending updates to kafka")
sink.sendDeltasToKafka(updates, "U", upsertCounter)
logInfo("sending deletes to kafka")
sink.sendDeltasToKafka(deletes, "D", deleteCounter)
(upsertCounter.value, deleteCounter.value)
}
}
我们看到的错误似乎表明驱动程序正在与执行者失去联系。我们将执行器内存增加到24G,网络超时高达900s,心跳间隔高达120s
17/11/27 20:36:18 WARN netty.NettyRpcEndpointRef: Error sending message [message = Heartbeat(1,[Lscala.Tuple2;@596e3aa6,BlockManagerId(1, server, 46292, None))] in 2 attempts
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. This timeout is controlled by spark.executor.heartbeatInterval
at ...
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [120 seconds]
at ...
稍后在日志中:
17/11/27 20:42:37 WARN netty.NettyRpcEndpointRef: Error sending message [message = Heartbeat(1,[Lscala.Tuple2;@25d1bd5f,BlockManagerId(1, server, 46292, None))] in 3 attempts
org.apache.spark.SparkException: Exception thrown in awaitResult
at ...
Caused by: java.lang.RuntimeException: org.apache.spark.SparkException: Could not find HeartbeatReceiver.
我们一直在操作(未成功)的配置开关是 --executor-memory 24G --conf spark.network.timeout=900s --conf spark.executor.heartbeatInterval=120s
我没有考虑的选项是增加我的驱动程序资源。我添加了 --driver-memory 4G
和 --driver-cores 2
并在大约 9 分钟内完成了我的工作。
这两个文件的内部连接(或使用内置的 except()
方法)似乎给执行程序带来了内存压力。对其中一个关键列进行分区似乎有助于缓解内存压力,但会增加总体时间,因为涉及更多的改组。
做这两个文件的左反连接需要我们有更多的驱动资源。没想到。