获取来自数千个传感器的数据的平均值

Get the average of data coming from thousands of sensors

我一直在尝试构建一个数据流管道,从 Pubsub 获取数据并将其发布到 Bigtable 或 Bigquery。我可以为 1 个传感器编写原始数据,但是当我尝试计算 window 数据(60 秒)的平均值时,我不能为数千个传感器这样做。

为了说明场景:

我的数据负载

data = {
  "timestamp": "2021-01-27 13:56:01.634717+08:00",
  "location": "location1",
  "name" : "name1",
  "datapoint1" : "Some Integer",
  "datapoint2" : "Some Integer",
  "datapoint3" : "Some String",
   .....
  "datapointN" : "Some Integer",
}

在我的示例中,将有数千个全名为“{location}_{name}”的传感器。对于每个传感器,我想 window 将数据设为 60 秒并计算该数据的平均值。

我期待的最终形式

我将采用作为 1 个元素存在的最终形式,以便插入到 Bigtable 和 Bigquery

finalform = {
  "timestamp": "2021-01-27 13:56:01.634717+08:00",
  "location": "location1",
  "name" : "name1",
  "datapoint1" : "Integer That Has Been Averaged",
  "datapoint2" : "Integer That Has Been Averaged",
  "datapoint3" : "String that has been left alone",
   .....
  "datapointN" : "Integer That Has Been Averaged",
}

到目前为止我的解决方案需要帮助。

p = beam.Pipeline()
rawdata = p | "Read" >> beam.io.ReadFromPubSub(topic=topic)
jsonData = rawdata | "Parse Json" >> beam.Map(json.loads)
windoweddata = jsonData|beam.WindowInto(window.FixedWindows(60))
groupedData = windoweddata | beam.GroupBy(location=lambda s: s["location"], name=lambda s: s["name"]) 

现在在最后一行之后我卡住了。我希望能够应用 CombinedValues 以使用均值。 但是,在应用 GroupBy 之后,我得到一个元组 (namedkey,value)。然后我 运行 一个 ParDo 将 json 拆分成 (key,value) 元组,为 CombinedValues 做准备,所有数据再次混合,来自不同位置的传感器数据是现在混合在 PCollection 中。

我的挑战

因此,最清晰的形式是我面临 2 个主要挑战:

  1. 如何将组合值应用于我的管道
  2. 如何将均值应用于管道但忽略“字符串”类型的条目

非常欢迎任何帮助。

到目前为止我在 chamikara 的帮助下的部分解决方案

import apache_beam as beam
import apache_beam.runners.interactive.interactive_beam as ib
from apache_beam import window
class AverageFn(beam.CombineFn):
  def create_accumulator(self):
    print(dir(self))
    return (1,2,3,4,5,6)

  def add_input(self, sum_count, input):
    print("add input",sum_count,input)
    return sum_count

  def merge_accumulators(self, accumulators):
    print(accumulators)
    data = zip(*accumulators)
    return data

  def extract_output(self, sum_count):
    print("extract_output",sum_count)
    data = sum_count
    return data

with beam.Pipeline() as pipeline:
  total = (
      pipeline
      | 'Create plant counts' >> beam.Create([
          {
              "timestamp": "2021-01-27 13:55:41.634717+08:00",
              "location":"L1",
              "name":"S1",
              "data1":1,
              "data2":"STRING",
              "data3":3,
          },
          {
              "timestamp": "2021-01-27 13:55:41.634717+08:00",
              "location":"L1",
              "name":"S2",
              "data1":1,
              "data2":"STRING",
              "data3":3,
          },
          {
              "timestamp": "2021-01-27 13:55:41.634717+08:00",
              "location":"L2",
              "name":"S3",
              "data1":1,
              "data2":"STRING",
              "data3":3,
          },
          {
              "timestamp": "2021-01-27 13:55:51.634717+08:00",
              "location":"L1",
              "name":"S1",
              "data1":1,
              "data2":"STRING",
              "data3":3,
          },
          {
              "timestamp": "2021-01-27 13:55:51.634717+08:00",
              "location":"L1",
              "name":"S2",
              "data1":1,
              "data2":"STRING",
              "data3":3,
          },
          {
              "timestamp": "2021-01-27 13:55:51.634717+08:00",
              "location":"L2",
              "name":"S3",
              "data1":1,
              "data2":"STRING",
              "data3":3,
          },
          {
              "timestamp": "2021-01-27 13:56:01.634717+08:00",
              "location":"L1",
              "name":"S1",
              "data1":1,
              "data2":"STRING",
              "data3":3,
          },
          {
              "timestamp": "2021-01-27 13:56:01.634717+08:00",
              "location":"L1",
              "name":"S2",
              "data1":1,
              "data2":"STRING",
              "data3":3,
          },
          {
              "timestamp": "2021-01-27 13:56:01.634717+08:00",
              "location":"L2",
              "name":"S3",
              "data1":1,
              "data2":"STRING",
              "data3":3,
          },
      ])
      | beam.GroupBy(location=lambda s: s["location"], name=lambda s: s["name"]) 
      | beam.CombinePerKey(AverageFn())
      | beam.Map(print))
  1. 请参阅 Combine 部分(特别是 CombinePerKeyhere。您应该首先使用适当的键(例如位置和名称的组合)将您的数据排列到 KV 的 PCollection 中。这个 PCollection 后面可以跟一个 CombinePerKey 和一个 CombineFn 实现,它结合了给定的数据对象(通过平均各个字段)。
  2. 这应该在您的 CombineFn 实现中完成,您应该在其中合并相关字段并忽略字符串字段。

最终答案如下。我的突破是意识到不使用 GroupBy 而是使用 beam.Map 因为 beam.Map 是 1 对 1 转换。我正在将我的数据的 1 行转换为一个包含 (key,data) 的元组,其中的键基本上是我使用 Beam.Row() 为该行指定的唯一标识符,稍后我将收集并使用组合每键

import apache_beam as beam
import apache_beam.runners.interactive.interactive_beam as ib
from apache_beam import window

DATA = [
        {
              "timestamp": "2021-01-27 13:55:41.634717+08:00",
              "location":"L1",
              "name":"S1",
              "data1":1,
              "data2":"STRING",
              "data3":5,
              "data4":5,
          },
          {
              "timestamp": "2021-01-27 13:55:41.634717+08:00",
              "location":"L1",
              "name":"S2",
              "data1":9,
              "data2":"STRING",
              "data3":2,
              "data4":2,
          },
          {
              "timestamp": "2021-01-27 13:55:41.634717+08:00",
              "location":"L2",
              "name":"S3",
              "data1":10,
              "data2":"STRING",
              "data3":4,
              "data4":1,
          },
          {
              "timestamp": "2021-01-27 13:55:51.634717+08:00",
              "location":"L1",
              "name":"S1",
              "data1":11,
              "data2":"STRING",
              "data3":2,
              "data4":7,
          },
          {
              "timestamp": "2021-01-27 13:55:51.634717+08:00",
              "location":"L1",
              "name":"S2",
              "data1":1,
              "data2":"STRING",
              "data3":4,
              "data4":8,
          },
          {
              "timestamp": "2021-01-27 13:55:51.634717+08:00",
              "location":"L2",
              "name":"S3",
              "data1":9,
              "data2":"STRING",
              "data3":7,
              "data4":8,
          },
          {
              "timestamp": "2021-01-27 13:56:01.634717+08:00",
              "location":"L1",
              "name":"S1",
              "data1":2,
              "data2":"STRING",
              "data3":3,
              "data4":5,
          },
          {
              "timestamp": "2021-01-27 13:56:01.634717+08:00",
              "location":"L1",
              "name":"S2",
              "data1":6,
              "data2":"STRING",
              "data3":7,
              "data4":6,
          },
          {
              "timestamp": "2021-01-27 13:56:01.634717+08:00",
              "location":"L2",
              "name":"S3",
              "data1":8,
              "data2":"STRING",
              "data3":1,
              "data4":2,
          },
]

class AverageFn2(beam.CombineFn):
  def create_accumulator(self):
    accumulator = {},0 #Set accumulator to be payload and count
    return accumulator

  def add_input(self, accumulator, input):
    rowdata, count = accumulator
    
    # Go through each item and try to add it if it is a float if not it is a string
    for key,value in input.items():
        if key in rowdata:
            try: 
                rowdata[key]+=float(value)
            except:
                rowdata[key]=None
        else:
            rowdata[key]=value
        
    return rowdata , count+1

  def merge_accumulators(self, accumulators):
    rowdata, counts = zip(*accumulators)
    
    payload = {}
    
    # Combine all the accumulators 
    for dictionary in rowdata:
        for key,value in dictionary.items():
            if key in payload:
                try:
                    payload[key]+=float(value)
                except:
                    payload[key]=None
            else:
                payload[key]=value    

    return payload, sum(counts)

  def extract_output(self, accumulator):    
    rowdata, count = accumulator
    
    for key,value in rowdata.items():
        try:
            float(value)
            rowdata[key] = rowdata[key]/count
        except:
            pass
    
    return rowdata

with beam.Pipeline() as pipeline:
  total = (
      pipeline
      | 'Create plant counts' >> beam.Create(DATA)
      | beam.Map( lambda item: (beam.Row(location=item["location"],name=item["name"]),item) )
      | beam.CombinePerKey(AverageFn2())
      | beam.Map(print))

希望这对像我这样的另一个 Dataflow 新手有所帮助。