运行 对多个列进行并行回归
Running regression on several columns in parallel
我有一个带有标签列的非常宽的数据框。我想 运行 独立地对每一列进行逻辑回归。我正在尝试找到并行 运行 的最有效方法。
+----------+--------+--------+--------+-----+------------+
| features | label1 | label2 | label3 | ... | label30000 |
+----------+--------+--------+--------+-----+------------+
我最初的想法是使用 ThreadPoolExecutor
,获取每一列的结果,然后加入:
extract_prob = udf(lambda x: float(x[1]), FloatType())
def lr_for_column(argm):
col_name = argm[0]
test_res = argm[1]
lr = LogisticRegression(featuresCol="features", labelCol=col_name, regParam=0.1)
lrModel = lr.fit(tfidf)
res = lrModel.transform(test_tfidf)
test_res = test_res.join(res.select('id', 'probability'), on="id")
test_res = test_res.withColumn(col_name, extract_prob('probability')).drop("probability")
return test_res.select('id', col_name)
with futures.ThreadPoolExecutor(max_workers=100) as executor:
future_results = [executor.submit(lr_for_column, [colname, test_res]) for colname in list_of_label_columns]
futures.wait(future_results)
for future in future_results:
test_res = test_res.join(future.result(), on="id")
但是这种方法的性能不是很好。有更快的方法吗?
考虑到可用资源,使用 ThreadPoolExecutor
- 和 200 个分区您无法同时处理约 16% 的数据,而这部分数据可以如果数据增长,只会变得更糟。
如果你想训练 30000 个模型并使用默认的迭代次数(100,在实践中可能很低)你的 Spark 程序将提交大约 3000000 个作业(每次迭代创建一个单独的作业),并且只有一小部分每个都可以同时处理 - 除非您添加更多资源,否则这不会给改进带来太大希望。
尽管有些事情您可以尝试:
- 确保不必重新计算最终特征。如有必要,将数据写入持久存储并将其加载回来,并确保缓存传递给模型的数据。
- 考虑应用一些降维算法。特征数300000不仅高,而且接近记录数(500000)。它不仅计算量大,而且会导致严重的过拟合。
如果您决定减少维度,请考虑抽样以进一步减少训练数据的大小,从而减少分区数量并提高整体吞吐量。
如果您的数据中有很强的线性趋势,即使在较小的样本上也应该可见,而不会显着降低精度。
考虑用不需要多个作业的变体替换昂贵的 pyspark.ml
算法,例如使用 spark-sklearn
中的一些工具组合(您可以创建集成模型,通过在每个分区上拟合 sklearn
模型)。
超额订阅内核。例如,如果您有 4 个物理内核/节点,则允许 8 或 16 个计算 IO 等待时间。
我有一个带有标签列的非常宽的数据框。我想 运行 独立地对每一列进行逻辑回归。我正在尝试找到并行 运行 的最有效方法。
+----------+--------+--------+--------+-----+------------+
| features | label1 | label2 | label3 | ... | label30000 |
+----------+--------+--------+--------+-----+------------+
我最初的想法是使用 ThreadPoolExecutor
,获取每一列的结果,然后加入:
extract_prob = udf(lambda x: float(x[1]), FloatType())
def lr_for_column(argm):
col_name = argm[0]
test_res = argm[1]
lr = LogisticRegression(featuresCol="features", labelCol=col_name, regParam=0.1)
lrModel = lr.fit(tfidf)
res = lrModel.transform(test_tfidf)
test_res = test_res.join(res.select('id', 'probability'), on="id")
test_res = test_res.withColumn(col_name, extract_prob('probability')).drop("probability")
return test_res.select('id', col_name)
with futures.ThreadPoolExecutor(max_workers=100) as executor:
future_results = [executor.submit(lr_for_column, [colname, test_res]) for colname in list_of_label_columns]
futures.wait(future_results)
for future in future_results:
test_res = test_res.join(future.result(), on="id")
但是这种方法的性能不是很好。有更快的方法吗?
考虑到可用资源,使用 ThreadPoolExecutor
-
如果你想训练 30000 个模型并使用默认的迭代次数(100,在实践中可能很低)你的 Spark 程序将提交大约 3000000 个作业(每次迭代创建一个单独的作业),并且只有一小部分每个都可以同时处理 - 除非您添加更多资源,否则这不会给改进带来太大希望。
尽管有些事情您可以尝试:
- 确保不必重新计算最终特征。如有必要,将数据写入持久存储并将其加载回来,并确保缓存传递给模型的数据。
- 考虑应用一些降维算法。特征数300000不仅高,而且接近记录数(500000)。它不仅计算量大,而且会导致严重的过拟合。
如果您决定减少维度,请考虑抽样以进一步减少训练数据的大小,从而减少分区数量并提高整体吞吐量。
如果您的数据中有很强的线性趋势,即使在较小的样本上也应该可见,而不会显着降低精度。
考虑用不需要多个作业的变体替换昂贵的
pyspark.ml
算法,例如使用spark-sklearn
中的一些工具组合(您可以创建集成模型,通过在每个分区上拟合sklearn
模型)。超额订阅内核。例如,如果您有 4 个物理内核/节点,则允许 8 或 16 个计算 IO 等待时间。