Python 异步 IO 图像处理
Python ASYNC IO Image Processing
我目前正在从网络摄像头拍摄图像,拍摄后应立即对其进行处理。该代码使用 cv2.videocapture() 函数。
想法:一旦捕获了图像,我想利用系统捕获新图像所需的时间来对先前的图像进行计算。
为了完成这个任务,我使用了 asyncio 库。
首先,我定义了一个生成器函数,它生成 n 个图像,这些图像应该存储在队列中:
async def produce(queue, n):
for x in range(n):
# produce an item
print('producing {}/{}'.format(x, n))
# simulate i/o operation using sleep
await asyncio.sleep(random.random())
item = obtainImage()
z.append(x)
# put the item in the queue
await queue.put(item)
之后我实现了一个消费者函数,它等待队列中的条目并计算一些图像特征(这里是一个简单的阈值):
async def consume(queue):
while True:
# wait for an item from the producer
# Load the image stored in the queue
item = await queue.get()
# process the item and store value
ret,thresh1 = cv2.threshold(item,127,255,cv2.THRESH_BINARY)
# simulate i/o operation using sleep
await asyncio.sleep(random.random())
# Notify the queue that the item has been processed
queue.task_done()
#Return the Processesed image
return thresh1
为了启动函数,我创建了一个 运行 函数并将其添加到事件循环中(运行 方法包含一个参数,表示我要处理的图像数量):
async def run(n):
queue = asyncio.Queue()
# schedule the consumer
consumer = asyncio.ensure_future(consume(queue))
# run the producer and wait for completion
await produce(queue, n)
# wait until the consumer has processed all items
await queue.join()
# the consumer is still awaiting for an item, cancel it
consumer.cancel()
loop = asyncio.get_event_loop()
loop.run_until_complete(run(10))
print(loop)
loop.close()
只要我使用打印语句,我就会得到这样的异步输出:
生产 0/10
生产 1/10
消耗 0...
生产 2/10
消耗 1...
生产 3/10
消耗 2...
生产 4/10
消耗 3...
这正是我想要的顺序。但是,我无法取回任何阈值图像。我错过了什么?
提前致谢
带全局队列返回结果的最小示例(使用 Python 3.5.2 测试)
import asyncio
import random
outQ = asyncio.Queue()
async def produce(queue, n):
for x in range(n):
print('producing {}/{}'.format(x, n))
await asyncio.sleep(random.random())
item = x
await queue.put(item)
async def consume(queue):
while True:
print('consume ')
item = await queue.get()
thresh1 = item**2
print("... adding %d" % thresh1)
await outQ.put(thresh1)
await asyncio.sleep(random.random())
queue.task_done()
async def run(n):
queue = asyncio.Queue()
consumer = asyncio.ensure_future(consume(queue))
await produce(queue, n)
await queue.join()
consumer.cancel()
loop = asyncio.get_event_loop()
loop.run_until_complete(run(10))
loop.close()
while not outQ.empty():
print(outQ.get_nowait())
输出:
...
producing 9/10
... adding 64
consume
... adding 81
consume
0
1
4
9
16
25
36
49
64
81
我目前正在从网络摄像头拍摄图像,拍摄后应立即对其进行处理。该代码使用 cv2.videocapture() 函数。
想法:一旦捕获了图像,我想利用系统捕获新图像所需的时间来对先前的图像进行计算。
为了完成这个任务,我使用了 asyncio 库。
首先,我定义了一个生成器函数,它生成 n 个图像,这些图像应该存储在队列中:
async def produce(queue, n):
for x in range(n):
# produce an item
print('producing {}/{}'.format(x, n))
# simulate i/o operation using sleep
await asyncio.sleep(random.random())
item = obtainImage()
z.append(x)
# put the item in the queue
await queue.put(item)
之后我实现了一个消费者函数,它等待队列中的条目并计算一些图像特征(这里是一个简单的阈值):
async def consume(queue):
while True:
# wait for an item from the producer
# Load the image stored in the queue
item = await queue.get()
# process the item and store value
ret,thresh1 = cv2.threshold(item,127,255,cv2.THRESH_BINARY)
# simulate i/o operation using sleep
await asyncio.sleep(random.random())
# Notify the queue that the item has been processed
queue.task_done()
#Return the Processesed image
return thresh1
为了启动函数,我创建了一个 运行 函数并将其添加到事件循环中(运行 方法包含一个参数,表示我要处理的图像数量):
async def run(n):
queue = asyncio.Queue()
# schedule the consumer
consumer = asyncio.ensure_future(consume(queue))
# run the producer and wait for completion
await produce(queue, n)
# wait until the consumer has processed all items
await queue.join()
# the consumer is still awaiting for an item, cancel it
consumer.cancel()
loop = asyncio.get_event_loop()
loop.run_until_complete(run(10))
print(loop)
loop.close()
只要我使用打印语句,我就会得到这样的异步输出:
生产 0/10
生产 1/10
消耗 0...
生产 2/10
消耗 1...
生产 3/10
消耗 2...
生产 4/10
消耗 3...
这正是我想要的顺序。但是,我无法取回任何阈值图像。我错过了什么?
提前致谢
带全局队列返回结果的最小示例(使用 Python 3.5.2 测试)
import asyncio
import random
outQ = asyncio.Queue()
async def produce(queue, n):
for x in range(n):
print('producing {}/{}'.format(x, n))
await asyncio.sleep(random.random())
item = x
await queue.put(item)
async def consume(queue):
while True:
print('consume ')
item = await queue.get()
thresh1 = item**2
print("... adding %d" % thresh1)
await outQ.put(thresh1)
await asyncio.sleep(random.random())
queue.task_done()
async def run(n):
queue = asyncio.Queue()
consumer = asyncio.ensure_future(consume(queue))
await produce(queue, n)
await queue.join()
consumer.cancel()
loop = asyncio.get_event_loop()
loop.run_until_complete(run(10))
loop.close()
while not outQ.empty():
print(outQ.get_nowait())
输出:
...
producing 9/10
... adding 64
consume
... adding 81
consume
0
1
4
9
16
25
36
49
64
81