将异步 io 的队列与 tensorflow 中的自动排队相结合
Combine queues for async io with auto enqueue in tensorflow
我有多个包含特征的 csv 文件。一个特征是图像的文件名。我想逐行读取 csv 文件,将相应图像的路径推送到新队列中。两个队列应该并行处理。
csv_queue = tf.FIFOQueue(10, tf.string)
csv_init = csv_queue.enqueue_many(['sample1.csv','sample2.csv','sample3.csv'])
path, label = read_label(csv_queue)
image_queue = tf.FIFOQueue(100, tf.string)
image_init = image_queue.enqueue(path)
_, image = read_image(image_queue)
with tf.Session() as sess:
csv_init.run()
image_init.run()
print(sess.run([key, label, path])) # works
print(sess.run(image)) # works
print(sess.run([key, label, path])) # works
print(sess.run(image)) # will deadlock unlike I do iq_init.run()
可以找到辅助函数的实现(例如 read_csv
)here
我可以 "hide" 在 sess.run(image)
后面调用 iq_init.run()
以避免死锁并允许批处理吗?
第二次调用sess.run()
时出现死锁,因为image_queue
是空的,而TensorFlow中的reader.read()
operation (which produces key
and buffer
) will block until something is added to the queue. In TensorFlow, this is typically achieved by creating a tf.train.QueueRunner
, which defines a set of ops that can be run in a background thread to keep moving elements into the queue. For more details see the tutorial on threading and queues。
filenames = ['./cs_disp_train.txt', './cs_limg_train.txt']
txt_queue = tf.train.string_input_producer(filenames)
# txt_queue = tf.FIFOQueue(10, tf.string)
# init_txt_queue = txt_queue.enqueue_many(filenames)
enqueue_ops = []
image_queues = tf.FIFOQueue(100, tf.string)
num_reader = len(filenames)
for i in range(num_reader):
reader = tf.TextLineReader()
_, buffer = reader.read(txt_queue)
enqueue_ops.append(image_queues.enqueue(buffer))
tf.train.queue_runner.add_queue_runner(
tf.train.queue_runner.QueueRunner(image_queues, enqueue_ops))
y = image_queues.dequeue()
sess = tf.Session()
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
# sess.run(init_txt_queue)
print sess.run([y])
print sess.run([y])
print sess.run([y])
print sess.run([y])
print sess.run([y])
print sess.run([y])
print sess.run([y])
print sess.run([y])
print sess.run([y])
print sess.run([y])
print sess.run([y])
print sess.run([y])
coord.request_stop()
coord.join(threads)
比如我有两个文件,'cs_disp_train.txt'和'cs_limg_train.txt',一个是深度图的文件地址,一个是对应彩色图的文件地址。代码创建了两个 FIFOQueues,一个读取这两个文件,另一个读取所有文件名。
我正在使用 tf.train.QueueRunner too. But I am not sure I understantd it. I got inspiration from here,尽管它读取 TFRecords 文件。希望对您有所帮助。
我有多个包含特征的 csv 文件。一个特征是图像的文件名。我想逐行读取 csv 文件,将相应图像的路径推送到新队列中。两个队列应该并行处理。
csv_queue = tf.FIFOQueue(10, tf.string)
csv_init = csv_queue.enqueue_many(['sample1.csv','sample2.csv','sample3.csv'])
path, label = read_label(csv_queue)
image_queue = tf.FIFOQueue(100, tf.string)
image_init = image_queue.enqueue(path)
_, image = read_image(image_queue)
with tf.Session() as sess:
csv_init.run()
image_init.run()
print(sess.run([key, label, path])) # works
print(sess.run(image)) # works
print(sess.run([key, label, path])) # works
print(sess.run(image)) # will deadlock unlike I do iq_init.run()
可以找到辅助函数的实现(例如 read_csv
)here
我可以 "hide" 在 sess.run(image)
后面调用 iq_init.run()
以避免死锁并允许批处理吗?
第二次调用sess.run()
时出现死锁,因为image_queue
是空的,而TensorFlow中的reader.read()
operation (which produces key
and buffer
) will block until something is added to the queue. In TensorFlow, this is typically achieved by creating a tf.train.QueueRunner
, which defines a set of ops that can be run in a background thread to keep moving elements into the queue. For more details see the tutorial on threading and queues。
filenames = ['./cs_disp_train.txt', './cs_limg_train.txt']
txt_queue = tf.train.string_input_producer(filenames)
# txt_queue = tf.FIFOQueue(10, tf.string)
# init_txt_queue = txt_queue.enqueue_many(filenames)
enqueue_ops = []
image_queues = tf.FIFOQueue(100, tf.string)
num_reader = len(filenames)
for i in range(num_reader):
reader = tf.TextLineReader()
_, buffer = reader.read(txt_queue)
enqueue_ops.append(image_queues.enqueue(buffer))
tf.train.queue_runner.add_queue_runner(
tf.train.queue_runner.QueueRunner(image_queues, enqueue_ops))
y = image_queues.dequeue()
sess = tf.Session()
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
# sess.run(init_txt_queue)
print sess.run([y])
print sess.run([y])
print sess.run([y])
print sess.run([y])
print sess.run([y])
print sess.run([y])
print sess.run([y])
print sess.run([y])
print sess.run([y])
print sess.run([y])
print sess.run([y])
print sess.run([y])
coord.request_stop()
coord.join(threads)
比如我有两个文件,'cs_disp_train.txt'和'cs_limg_train.txt',一个是深度图的文件地址,一个是对应彩色图的文件地址。代码创建了两个 FIFOQueues,一个读取这两个文件,另一个读取所有文件名。
我正在使用 tf.train.QueueRunner too. But I am not sure I understantd it. I got inspiration from here,尽管它读取 TFRecords 文件。希望对您有所帮助。