将异步 io 的队列与 tensorflow 中的自动排队相结合

Combine queues for async io with auto enqueue in tensorflow

我有多个包含特征的 csv 文件。一个特征是图像的文件名。我想逐行读取 csv 文件,将相应图像的路径推送到新队列中。两个队列应该并行处理。

csv_queue = tf.FIFOQueue(10, tf.string)
csv_init = csv_queue.enqueue_many(['sample1.csv','sample2.csv','sample3.csv'])

path, label = read_label(csv_queue)

image_queue = tf.FIFOQueue(100, tf.string)
image_init = image_queue.enqueue(path)
_, image = read_image(image_queue)


with tf.Session() as sess:
    csv_init.run()
    image_init.run()

    print(sess.run([key, label, path])) # works
    print(sess.run(image)) # works

    print(sess.run([key, label, path])) # works
    print(sess.run(image)) # will deadlock unlike I do iq_init.run()

可以找到辅助函数的实现(例如 read_csvhere

我可以 "hide" 在 sess.run(image) 后面调用 iq_init.run() 以避免死锁并允许批处理吗?

第二次调用sess.run()时出现死锁,因为image_queue是空的,而TensorFlow中的reader.read() operation (which produces key and buffer) will block until something is added to the queue. In TensorFlow, this is typically achieved by creating a tf.train.QueueRunner, which defines a set of ops that can be run in a background thread to keep moving elements into the queue. For more details see the tutorial on threading and queues

filenames = ['./cs_disp_train.txt', './cs_limg_train.txt']
txt_queue = tf.train.string_input_producer(filenames)
# txt_queue = tf.FIFOQueue(10, tf.string)
# init_txt_queue = txt_queue.enqueue_many(filenames)

enqueue_ops = []
image_queues = tf.FIFOQueue(100, tf.string)

num_reader = len(filenames)
for i in range(num_reader):
    reader = tf.TextLineReader()
    _, buffer = reader.read(txt_queue)
    enqueue_ops.append(image_queues.enqueue(buffer))

tf.train.queue_runner.add_queue_runner(
    tf.train.queue_runner.QueueRunner(image_queues, enqueue_ops))

y = image_queues.dequeue()

sess = tf.Session()
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)

# sess.run(init_txt_queue)

print sess.run([y])
print sess.run([y])
print sess.run([y])
print sess.run([y])
print sess.run([y])
print sess.run([y])
print sess.run([y])
print sess.run([y])
print sess.run([y])
print sess.run([y])
print sess.run([y])
print sess.run([y])

coord.request_stop()
coord.join(threads)

比如我有两个文件,'cs_disp_train.txt'和'cs_limg_train.txt',一个是深度图的文件地址,一个是对应彩色图的文件地址。代码创建了两个 FIFOQueues,一个读取这两个文件,另一个读取所有文件名。

我正在使用 tf.train.QueueRunner too. But I am not sure I understantd it. I got inspiration from here,尽管它读取 TFRecords 文件。希望对您有所帮助。