使用 Apache Beam 进行线性回归
Linear Regression with Apache Beam
如何在光束流水线中拟合大量线性回归?我有一个很大的 csv,我想根据两列 A 和 B 对每一列(大约 500)进行归一化。也就是说,我想为 csv X 中的每一列获得 X ~ A + B 的标准残差。
这是一个有趣的用例。你可以这样做:
INDEX_A = # Something
INDEX_B = # Something else
parsed_rows = pipeline | beam.ReadFromText(my_csv)
| beam.Map(parse_each_line)
def column_paired_rows(row):
for idx, val in row:
if idx in (INDEX_A, INDEX_B): continue
# Yield the values keyed with the independent + dependent variable indices
yield ((INDEX_A, idx), {'independent_var_value': row[INDEX_A],
'independent_var_idx': INDEX_A,
'dependent_var_value': val,
'dependent_var_idx': idx})
yield ((INDEX_B, idx), {'independent_var_value': row[INDEX_B],
'independent_var_idx': INDEX_B,
'dependent_var_value': val,
'dependent_var_idx': idx})
column_pairs = parsed_rows | beam.FlatMap(column_paired_rows) | beam.GroupByKey()
column_pairs
PCollection 将通过 independent, dependent
变量对对所有元素进行分组,然后您可以 运行 进行分析。
def perform_linear_regression(elm):
key = elm[0] # KEY is a tuple with (independent variable index, dependent variable index)
values = elm[1] # This is an iterable with the data points that you need.
pairs = [(v['independent_var_value'], v['dependent_var_value']) for v in values]
model = linear_regression(pairs)
return (key, model)
models = column_pairs | beam.Map(perform_linear_regression)
LMK 如果您希望我添加更多详细信息
如何在光束流水线中拟合大量线性回归?我有一个很大的 csv,我想根据两列 A 和 B 对每一列(大约 500)进行归一化。也就是说,我想为 csv X 中的每一列获得 X ~ A + B 的标准残差。
这是一个有趣的用例。你可以这样做:
INDEX_A = # Something
INDEX_B = # Something else
parsed_rows = pipeline | beam.ReadFromText(my_csv)
| beam.Map(parse_each_line)
def column_paired_rows(row):
for idx, val in row:
if idx in (INDEX_A, INDEX_B): continue
# Yield the values keyed with the independent + dependent variable indices
yield ((INDEX_A, idx), {'independent_var_value': row[INDEX_A],
'independent_var_idx': INDEX_A,
'dependent_var_value': val,
'dependent_var_idx': idx})
yield ((INDEX_B, idx), {'independent_var_value': row[INDEX_B],
'independent_var_idx': INDEX_B,
'dependent_var_value': val,
'dependent_var_idx': idx})
column_pairs = parsed_rows | beam.FlatMap(column_paired_rows) | beam.GroupByKey()
column_pairs
PCollection 将通过 independent, dependent
变量对对所有元素进行分组,然后您可以 运行 进行分析。
def perform_linear_regression(elm):
key = elm[0] # KEY is a tuple with (independent variable index, dependent variable index)
values = elm[1] # This is an iterable with the data points that you need.
pairs = [(v['independent_var_value'], v['dependent_var_value']) for v in values]
model = linear_regression(pairs)
return (key, model)
models = column_pairs | beam.Map(perform_linear_regression)
LMK 如果您希望我添加更多详细信息