Pyspark 为什么 GroupBy(和 Group By with count())对 LGBMClassifier 的结果产生不一致的结果

Pyspark Why does GroupBy (and GroupBy with count()) on results of GBMClassifier produces inconsistent result

在 Pyspark 中,我通过 GBMClassifier 运行加载了一个大型数据集。在 train/fitting 之前,对输入数据执行 groupby 会产生预期结果(值加起来等于预期计数等)。但是,在拟合测试数据后,对预测使用 GroupBy 不会给出可重现的结果。我试图生成一个基本的 Precision/Recall,所以我试图分成标签和预测组。输出的结果不会有很大变化,但会四处移动并且不可靠。我没有使用 MultiClassMetrics,因为我想探索不同的分类概率阈值,但在这一点上是开放的。我无法将我的输出 DataFrame 转换为 MultiClassMetrics 接受的格式。

我尝试了 GroupBy 和 Count() 以及对特定数据集的过滤,以查看使用两种不同的方法是否会得出不同的结果(即,如果列中的数据与过滤器)

值得一提的是,我正在 4 节点集群上的 EMR Notebooks 中使用 AWS。

train_df=splits[0]
test_df=splits[1]

gbm = GBTClassifier(stepSize=0.1, seed=2018)

model_gbm = gbm.fit(train_df)
prediction_gbm = model_gbm.transform(test_df)

#Split the probability column into two values to allow assessment of different classification thresholds
prediction_gbm = (prediction_gbm.withColumn("probability_split",to_array(col("probability")))        .withColumn('prob_norm',col("probability_split")0]).withColumn('prob_fraud',col("probability_split")[1]))

#Test a new threshold

newPrediction = when(col('prob_fraud')>0.5,1).otherwise(0)
prediction_gbm = prediction_gbm.withColumn('newPrediction',newPrediction)

#This section simply prints the results of my grouping. This is what is producing inconsistent results
gbm_FN=prediction_gbm.filter((F.col('label')==1) & (F.col('newPrediction')==0)).count()
gbm_FP=prediction_gbm.filter((F.col('label')==0) & (F.col('newPrediction')==1)).count()
gbm_TP=prediction_gbm.filter((F.col('label')==1) & (F.col('newPrediction')==1)).count()
gbm_TN=prediction_gbm.filter((F.col('label')==0) & (F.col('newPrediction')==0)).count()

#Here is the groupBy code as well for clarification
prediction_gbm.groupBy(['label','prediction']).count().show()

我希望标签和预测的 4 组输出值能够一致地相加。此外,我希望 groupby 的结果与产生的 4 个值相同,并且加起来是相同的值。

编辑:当我训练我的模型时,我在第一次通过时遇到了这个错误,但是之后当我 运行 它时,我没有看到这个问题:

Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/opt/conda/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/lib/python3.6/site-packages/awseditorssparkmonitoringwidget-1.0-py3.6.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 178, in cell_monitor
    job_binned_stages[job_id][stage_id] = all_stages[stage_id]
KeyError: 905

我花了一些时间研究这个问题,结果不一致的问题似乎是由我最初的答案中没有包含的 randomSplit() 函数引起的。解决方案是在拆分之前缓存数据帧。更多信息在这里:

至于错误,这显然是 PySpark 的超时。稍微查了一下表明这可能是内存不足错误,但很难在 EMR 笔记本中进行调试。直接对主节点使用 spark-submit 时不会发生这种情况,这表明它可能是基于内存的,因为您可以在 spark-submit 中将内存向上推,而这在笔记本中并不那么容易。