如何过滤数据流

How to filter a stream of data

我正在尝试为 Raspi 上的传感器读数实施过滤器。我从传感器中获得了源源不断的数据流,我想使用我的过滤器代码来过滤这些数据。我试图通过在我的范围代码中生成随机数来模拟我将从传感器获得的那种读数。问题是由于某种原因我看不到过滤器的任何输出,我什至不知道它是否有效。下面是生成随机数据流和过滤器的代码。我在模拟传感器读数吗?(注意:我不是特别关心这些值,我只想要一个可以继续读取和过滤的随机数据流)

生成随机数据

# generate random integer values, Filename: Rangen.py
from random import seed
from random import randint
from time import sleep
# seed random number generator
seed(1)
# generate some integers
def rangen():
    value = randint(0, 100)
    return value

while True:
    data = rangen()
    sleep(0.1)

这是过滤器

from time import sleep
from statistics import mean
from Rangen import data

def AplhaTrimmedfilter(windowsize,alpha, sensordata):
    data_list = []
    Flag = True
    while Flag:
        if len(data_list)<windowsize:
            data_list.append(sensordata)
            print("data added")
            sleep(0.0010)
            break
    if len(data_list)==windowsize:
        sorted_data = data_list.sort()     
        sliced_data = sorted_data[(windowsize/2):windowsize-alpha+(alpha/2)]
        alphamean = statistics.mean(sliced_data) #sum(a) / len(a)
        data_list = []
        print(data_list)
    return alphamean

AlphaTrimmedfilter(5,2,data)

我想从生成的数据中取出 5 个值,对它们进行排序、切片和平均并在最后显示。我不明白我是否正确实施了这一点,因为控制台上的输出没有显示任何内容。任何输入表示赞赏。谢谢你。

编辑:正如 MisterMiyagi 所建议的,我使用生成器来解决过滤问题,并使用 [[item] for item in filtered_data] 遍历生成器对象,但现在我遇到了一个奇怪的 TypeError 'Float' 对象不可迭代。我将如何解决这个问题?我认为这可能是由于我传递给过滤器的数据类型(主要是浮点数)。以下是基于 MisterMiyagi 的回答修改过滤器的代码:

def alpha_mean(window, alpha):
    cut = alpha//2
    data = sorted(window)[cut:-cut]
    result = sum(data)/len(data)
    return result

def alphatrimmer(window_size, alpha, sensor_stream):
    window = []
    for item in sensor_stream:
        window.append(item)
        if len(window) >= window_size:
            break
    yield alpha_mean(window, alpha)

    for item in sensor_stream:
        window.pop(0)
        window.append(item)
        yield alpha_mean(window,alpha)

Python 的原生等价流是迭代器。您可以通过编写生成器来创建自己的迭代器。例如,我们可以将您的 rangen 函数产生一个值 变成一个 产生多个值的生成器函数 .

# stream.py
import random

random.seed(1)

def random_stream():
    """Generator producing a stream of random numbers"""
    while True:      # generators can produce values indefinitely...
        value = random.randint(0, 100)
        yield value  # ... by yield'ing them

除其他外,iterators/generators 可以被 for 语句消耗。您可以在 Python 控制台上进行测试:

>>> from stream import random_stream
>>> data = random_stream()   # note: the generator is called using ()
>>> for idx, item in enumerate(data):
...     if idx >= 3: break
...     print(idx, ':', item)
0 : 17
1 : 72
2 : 97

理想情况下,使用这种流的函数是 也是一个生成器 - 它不是应用固定的 window 一次,而是将 window溪流。这允许无缝地使用整个流,无论它有多长。

理想情况下,您将 window 的跟踪和每个 window 位置的计算分开。值得注意的是,后者 不是 生成器 - 它仅适用于单个 window 状态。

def alpha_mean(window, alpha):
    """Compute the mean for a given window and alpha"""
    size = len(window)
    data = sorted(window)[size//2:size - (alpha // 2)]
    return sum(data) / len(data)


def alpha_trim(window_size, alpha, sensor_stream):
    """Produce a trimmed stream based on the ``sensor_stream`` data"""
    window = []  # note: a collections.deque is more efficient for very large windows
    # fill the first window
    for item in sensor_stream:
        window.append(item)
        if len(window) >= window_size:
            break
    yield alpha_mean(window, alpha)
    # advance the window as new data comes in
    for item in sensor_stream:
        # "move" the window by discarding the oldest value
        window.pop(0)
        window.append(item)
        yield alpha_mean(window, alpha)

alpha_trim是一个生成器,也带一个生成器。您可以在 Python 控制台中再次测试:

>>> data = alpha_trim(5, 2, rangen())
>>> for idx, item in enumerate(data):
...     if idx >= 5: break
...     print(idx, ':', item)
0 : 52.0
1 : 52.0
2 : 47.5
3 : 47.5
4 : 60.0