获取来自数千个传感器的数据的平均值
Get the average of data coming from thousands of sensors
我一直在尝试构建一个数据流管道,从 Pubsub 获取数据并将其发布到 Bigtable 或 Bigquery。我可以为 1 个传感器编写原始数据,但是当我尝试计算 window 数据(60 秒)的平均值时,我不能为数千个传感器这样做。
为了说明场景:
我的数据负载
data = {
"timestamp": "2021-01-27 13:56:01.634717+08:00",
"location": "location1",
"name" : "name1",
"datapoint1" : "Some Integer",
"datapoint2" : "Some Integer",
"datapoint3" : "Some String",
.....
"datapointN" : "Some Integer",
}
在我的示例中,将有数千个全名为“{location}_{name}”的传感器。对于每个传感器,我想 window 将数据设为 60 秒并计算该数据的平均值。
我期待的最终形式
我将采用作为 1 个元素存在的最终形式,以便插入到 Bigtable 和 Bigquery
finalform = {
"timestamp": "2021-01-27 13:56:01.634717+08:00",
"location": "location1",
"name" : "name1",
"datapoint1" : "Integer That Has Been Averaged",
"datapoint2" : "Integer That Has Been Averaged",
"datapoint3" : "String that has been left alone",
.....
"datapointN" : "Integer That Has Been Averaged",
}
到目前为止我的解决方案需要帮助。
p = beam.Pipeline()
rawdata = p | "Read" >> beam.io.ReadFromPubSub(topic=topic)
jsonData = rawdata | "Parse Json" >> beam.Map(json.loads)
windoweddata = jsonData|beam.WindowInto(window.FixedWindows(60))
groupedData = windoweddata | beam.GroupBy(location=lambda s: s["location"], name=lambda s: s["name"])
现在在最后一行之后我卡住了。我希望能够应用 CombinedValues 以使用均值。
但是,在应用 GroupBy 之后,我得到一个元组 (namedkey,value)。然后我 运行 一个 ParDo 将 json 拆分成 (key,value) 元组,为 CombinedValues 做准备,所有数据再次混合,来自不同位置的传感器数据是现在混合在 PCollection 中。
我的挑战
因此,最清晰的形式是我面临 2 个主要挑战:
- 如何将组合值应用于我的管道
- 如何将均值应用于管道但忽略“字符串”类型的条目
非常欢迎任何帮助。
到目前为止我在 chamikara 的帮助下的部分解决方案
import apache_beam as beam
import apache_beam.runners.interactive.interactive_beam as ib
from apache_beam import window
class AverageFn(beam.CombineFn):
def create_accumulator(self):
print(dir(self))
return (1,2,3,4,5,6)
def add_input(self, sum_count, input):
print("add input",sum_count,input)
return sum_count
def merge_accumulators(self, accumulators):
print(accumulators)
data = zip(*accumulators)
return data
def extract_output(self, sum_count):
print("extract_output",sum_count)
data = sum_count
return data
with beam.Pipeline() as pipeline:
total = (
pipeline
| 'Create plant counts' >> beam.Create([
{
"timestamp": "2021-01-27 13:55:41.634717+08:00",
"location":"L1",
"name":"S1",
"data1":1,
"data2":"STRING",
"data3":3,
},
{
"timestamp": "2021-01-27 13:55:41.634717+08:00",
"location":"L1",
"name":"S2",
"data1":1,
"data2":"STRING",
"data3":3,
},
{
"timestamp": "2021-01-27 13:55:41.634717+08:00",
"location":"L2",
"name":"S3",
"data1":1,
"data2":"STRING",
"data3":3,
},
{
"timestamp": "2021-01-27 13:55:51.634717+08:00",
"location":"L1",
"name":"S1",
"data1":1,
"data2":"STRING",
"data3":3,
},
{
"timestamp": "2021-01-27 13:55:51.634717+08:00",
"location":"L1",
"name":"S2",
"data1":1,
"data2":"STRING",
"data3":3,
},
{
"timestamp": "2021-01-27 13:55:51.634717+08:00",
"location":"L2",
"name":"S3",
"data1":1,
"data2":"STRING",
"data3":3,
},
{
"timestamp": "2021-01-27 13:56:01.634717+08:00",
"location":"L1",
"name":"S1",
"data1":1,
"data2":"STRING",
"data3":3,
},
{
"timestamp": "2021-01-27 13:56:01.634717+08:00",
"location":"L1",
"name":"S2",
"data1":1,
"data2":"STRING",
"data3":3,
},
{
"timestamp": "2021-01-27 13:56:01.634717+08:00",
"location":"L2",
"name":"S3",
"data1":1,
"data2":"STRING",
"data3":3,
},
])
| beam.GroupBy(location=lambda s: s["location"], name=lambda s: s["name"])
| beam.CombinePerKey(AverageFn())
| beam.Map(print))
最终答案如下。我的突破是意识到不使用 GroupBy 而是使用 beam.Map 因为 beam.Map 是 1 对 1 转换。我正在将我的数据的 1 行转换为一个包含 (key,data) 的元组,其中的键基本上是我使用 Beam.Row() 为该行指定的唯一标识符,稍后我将收集并使用组合每键
import apache_beam as beam
import apache_beam.runners.interactive.interactive_beam as ib
from apache_beam import window
DATA = [
{
"timestamp": "2021-01-27 13:55:41.634717+08:00",
"location":"L1",
"name":"S1",
"data1":1,
"data2":"STRING",
"data3":5,
"data4":5,
},
{
"timestamp": "2021-01-27 13:55:41.634717+08:00",
"location":"L1",
"name":"S2",
"data1":9,
"data2":"STRING",
"data3":2,
"data4":2,
},
{
"timestamp": "2021-01-27 13:55:41.634717+08:00",
"location":"L2",
"name":"S3",
"data1":10,
"data2":"STRING",
"data3":4,
"data4":1,
},
{
"timestamp": "2021-01-27 13:55:51.634717+08:00",
"location":"L1",
"name":"S1",
"data1":11,
"data2":"STRING",
"data3":2,
"data4":7,
},
{
"timestamp": "2021-01-27 13:55:51.634717+08:00",
"location":"L1",
"name":"S2",
"data1":1,
"data2":"STRING",
"data3":4,
"data4":8,
},
{
"timestamp": "2021-01-27 13:55:51.634717+08:00",
"location":"L2",
"name":"S3",
"data1":9,
"data2":"STRING",
"data3":7,
"data4":8,
},
{
"timestamp": "2021-01-27 13:56:01.634717+08:00",
"location":"L1",
"name":"S1",
"data1":2,
"data2":"STRING",
"data3":3,
"data4":5,
},
{
"timestamp": "2021-01-27 13:56:01.634717+08:00",
"location":"L1",
"name":"S2",
"data1":6,
"data2":"STRING",
"data3":7,
"data4":6,
},
{
"timestamp": "2021-01-27 13:56:01.634717+08:00",
"location":"L2",
"name":"S3",
"data1":8,
"data2":"STRING",
"data3":1,
"data4":2,
},
]
class AverageFn2(beam.CombineFn):
def create_accumulator(self):
accumulator = {},0 #Set accumulator to be payload and count
return accumulator
def add_input(self, accumulator, input):
rowdata, count = accumulator
# Go through each item and try to add it if it is a float if not it is a string
for key,value in input.items():
if key in rowdata:
try:
rowdata[key]+=float(value)
except:
rowdata[key]=None
else:
rowdata[key]=value
return rowdata , count+1
def merge_accumulators(self, accumulators):
rowdata, counts = zip(*accumulators)
payload = {}
# Combine all the accumulators
for dictionary in rowdata:
for key,value in dictionary.items():
if key in payload:
try:
payload[key]+=float(value)
except:
payload[key]=None
else:
payload[key]=value
return payload, sum(counts)
def extract_output(self, accumulator):
rowdata, count = accumulator
for key,value in rowdata.items():
try:
float(value)
rowdata[key] = rowdata[key]/count
except:
pass
return rowdata
with beam.Pipeline() as pipeline:
total = (
pipeline
| 'Create plant counts' >> beam.Create(DATA)
| beam.Map( lambda item: (beam.Row(location=item["location"],name=item["name"]),item) )
| beam.CombinePerKey(AverageFn2())
| beam.Map(print))
希望这对像我这样的另一个 Dataflow 新手有所帮助。
我一直在尝试构建一个数据流管道,从 Pubsub 获取数据并将其发布到 Bigtable 或 Bigquery。我可以为 1 个传感器编写原始数据,但是当我尝试计算 window 数据(60 秒)的平均值时,我不能为数千个传感器这样做。
为了说明场景:
我的数据负载
data = {
"timestamp": "2021-01-27 13:56:01.634717+08:00",
"location": "location1",
"name" : "name1",
"datapoint1" : "Some Integer",
"datapoint2" : "Some Integer",
"datapoint3" : "Some String",
.....
"datapointN" : "Some Integer",
}
在我的示例中,将有数千个全名为“{location}_{name}”的传感器。对于每个传感器,我想 window 将数据设为 60 秒并计算该数据的平均值。
我期待的最终形式
我将采用作为 1 个元素存在的最终形式,以便插入到 Bigtable 和 Bigquery
finalform = {
"timestamp": "2021-01-27 13:56:01.634717+08:00",
"location": "location1",
"name" : "name1",
"datapoint1" : "Integer That Has Been Averaged",
"datapoint2" : "Integer That Has Been Averaged",
"datapoint3" : "String that has been left alone",
.....
"datapointN" : "Integer That Has Been Averaged",
}
到目前为止我的解决方案需要帮助。
p = beam.Pipeline()
rawdata = p | "Read" >> beam.io.ReadFromPubSub(topic=topic)
jsonData = rawdata | "Parse Json" >> beam.Map(json.loads)
windoweddata = jsonData|beam.WindowInto(window.FixedWindows(60))
groupedData = windoweddata | beam.GroupBy(location=lambda s: s["location"], name=lambda s: s["name"])
现在在最后一行之后我卡住了。我希望能够应用 CombinedValues 以使用均值。 但是,在应用 GroupBy 之后,我得到一个元组 (namedkey,value)。然后我 运行 一个 ParDo 将 json 拆分成 (key,value) 元组,为 CombinedValues 做准备,所有数据再次混合,来自不同位置的传感器数据是现在混合在 PCollection 中。
我的挑战
因此,最清晰的形式是我面临 2 个主要挑战:
- 如何将组合值应用于我的管道
- 如何将均值应用于管道但忽略“字符串”类型的条目
非常欢迎任何帮助。
到目前为止我在 chamikara 的帮助下的部分解决方案
import apache_beam as beam
import apache_beam.runners.interactive.interactive_beam as ib
from apache_beam import window
class AverageFn(beam.CombineFn):
def create_accumulator(self):
print(dir(self))
return (1,2,3,4,5,6)
def add_input(self, sum_count, input):
print("add input",sum_count,input)
return sum_count
def merge_accumulators(self, accumulators):
print(accumulators)
data = zip(*accumulators)
return data
def extract_output(self, sum_count):
print("extract_output",sum_count)
data = sum_count
return data
with beam.Pipeline() as pipeline:
total = (
pipeline
| 'Create plant counts' >> beam.Create([
{
"timestamp": "2021-01-27 13:55:41.634717+08:00",
"location":"L1",
"name":"S1",
"data1":1,
"data2":"STRING",
"data3":3,
},
{
"timestamp": "2021-01-27 13:55:41.634717+08:00",
"location":"L1",
"name":"S2",
"data1":1,
"data2":"STRING",
"data3":3,
},
{
"timestamp": "2021-01-27 13:55:41.634717+08:00",
"location":"L2",
"name":"S3",
"data1":1,
"data2":"STRING",
"data3":3,
},
{
"timestamp": "2021-01-27 13:55:51.634717+08:00",
"location":"L1",
"name":"S1",
"data1":1,
"data2":"STRING",
"data3":3,
},
{
"timestamp": "2021-01-27 13:55:51.634717+08:00",
"location":"L1",
"name":"S2",
"data1":1,
"data2":"STRING",
"data3":3,
},
{
"timestamp": "2021-01-27 13:55:51.634717+08:00",
"location":"L2",
"name":"S3",
"data1":1,
"data2":"STRING",
"data3":3,
},
{
"timestamp": "2021-01-27 13:56:01.634717+08:00",
"location":"L1",
"name":"S1",
"data1":1,
"data2":"STRING",
"data3":3,
},
{
"timestamp": "2021-01-27 13:56:01.634717+08:00",
"location":"L1",
"name":"S2",
"data1":1,
"data2":"STRING",
"data3":3,
},
{
"timestamp": "2021-01-27 13:56:01.634717+08:00",
"location":"L2",
"name":"S3",
"data1":1,
"data2":"STRING",
"data3":3,
},
])
| beam.GroupBy(location=lambda s: s["location"], name=lambda s: s["name"])
| beam.CombinePerKey(AverageFn())
| beam.Map(print))
最终答案如下。我的突破是意识到不使用 GroupBy 而是使用 beam.Map 因为 beam.Map 是 1 对 1 转换。我正在将我的数据的 1 行转换为一个包含 (key,data) 的元组,其中的键基本上是我使用 Beam.Row() 为该行指定的唯一标识符,稍后我将收集并使用组合每键
import apache_beam as beam
import apache_beam.runners.interactive.interactive_beam as ib
from apache_beam import window
DATA = [
{
"timestamp": "2021-01-27 13:55:41.634717+08:00",
"location":"L1",
"name":"S1",
"data1":1,
"data2":"STRING",
"data3":5,
"data4":5,
},
{
"timestamp": "2021-01-27 13:55:41.634717+08:00",
"location":"L1",
"name":"S2",
"data1":9,
"data2":"STRING",
"data3":2,
"data4":2,
},
{
"timestamp": "2021-01-27 13:55:41.634717+08:00",
"location":"L2",
"name":"S3",
"data1":10,
"data2":"STRING",
"data3":4,
"data4":1,
},
{
"timestamp": "2021-01-27 13:55:51.634717+08:00",
"location":"L1",
"name":"S1",
"data1":11,
"data2":"STRING",
"data3":2,
"data4":7,
},
{
"timestamp": "2021-01-27 13:55:51.634717+08:00",
"location":"L1",
"name":"S2",
"data1":1,
"data2":"STRING",
"data3":4,
"data4":8,
},
{
"timestamp": "2021-01-27 13:55:51.634717+08:00",
"location":"L2",
"name":"S3",
"data1":9,
"data2":"STRING",
"data3":7,
"data4":8,
},
{
"timestamp": "2021-01-27 13:56:01.634717+08:00",
"location":"L1",
"name":"S1",
"data1":2,
"data2":"STRING",
"data3":3,
"data4":5,
},
{
"timestamp": "2021-01-27 13:56:01.634717+08:00",
"location":"L1",
"name":"S2",
"data1":6,
"data2":"STRING",
"data3":7,
"data4":6,
},
{
"timestamp": "2021-01-27 13:56:01.634717+08:00",
"location":"L2",
"name":"S3",
"data1":8,
"data2":"STRING",
"data3":1,
"data4":2,
},
]
class AverageFn2(beam.CombineFn):
def create_accumulator(self):
accumulator = {},0 #Set accumulator to be payload and count
return accumulator
def add_input(self, accumulator, input):
rowdata, count = accumulator
# Go through each item and try to add it if it is a float if not it is a string
for key,value in input.items():
if key in rowdata:
try:
rowdata[key]+=float(value)
except:
rowdata[key]=None
else:
rowdata[key]=value
return rowdata , count+1
def merge_accumulators(self, accumulators):
rowdata, counts = zip(*accumulators)
payload = {}
# Combine all the accumulators
for dictionary in rowdata:
for key,value in dictionary.items():
if key in payload:
try:
payload[key]+=float(value)
except:
payload[key]=None
else:
payload[key]=value
return payload, sum(counts)
def extract_output(self, accumulator):
rowdata, count = accumulator
for key,value in rowdata.items():
try:
float(value)
rowdata[key] = rowdata[key]/count
except:
pass
return rowdata
with beam.Pipeline() as pipeline:
total = (
pipeline
| 'Create plant counts' >> beam.Create(DATA)
| beam.Map( lambda item: (beam.Row(location=item["location"],name=item["name"]),item) )
| beam.CombinePerKey(AverageFn2())
| beam.Map(print))
希望这对像我这样的另一个 Dataflow 新手有所帮助。