调用函数与内联代码时的不同行为
Different behavior when calling a function vs. inline code
我正在尝试查看在使用 Python 的 Apache Beam SDK 中的 DirectRunner
时是否可以将 PCollection 的元素发送到父进程。
但是,我 运行 遇到了一个 st运行ge 错误,当队列被实例化并且管道在脚本的 __main__
部分内被调用时,一切似乎都工作正常,但在子函数内部调用相同代码时则不然。我猜这是由于在幕后进行了一些酸洗/钻孔,但更具体的解释将不胜感激。
下面使用的/tmp/inputs/winterstale.txt
文件可以从以下网址下载:https://storage.googleapis.com/apache-beam-samples/shakespeare/winterstale.txt
from __future__ import print_function
import atexit
import queue
import tempfile
import time
import unittest
import apache_beam as beam
from apache_beam.io.filesystems import FileSystems
from apache_beam.runners.direct.direct_runner import BundleBasedDirectRunner
from apache_beam.runners.interactive.cache_manager import FileBasedCacheManager
from apache_beam.runners.interactive.cache_manager import ReadCache
from apache_beam.runners.interactive.cache_manager import WriteCache
def add_to_queue(element, queue):
queue.put(element)
def write_to_queue():
q = queue.Queue()
with beam.Pipeline(runner=BundleBasedDirectRunner()) as p:
_ = (
p
| "Read" >> beam.io.ReadFromText("/tmp/inputs/winterstale.txt")
| "Remove whitespace" >> beam.Map(lambda element: element.strip("\n\t|"))
| "Remove empty lines" >> beam.FlatMap(lambda element: [element] if element else [])
| "Write" >> beam.Map(lambda element: add_to_queue(element, queue=q))
)
return list(q.queue)
if __name__ == "__main__":
cache_location = tempfile.mkdtemp()
atexit.register(FileSystems.delete, [cache_location])
# Using a function call
cache_manager = FileBasedCacheManager(cache_dir=cache_location)
result1 = write_to_queue()
print(len(result1)) # >>> prints "0" <<<
# Copy-pasing the code from "write_to_queue()"
q = queue.Queue()
with beam.Pipeline(runner=BundleBasedDirectRunner()) as p:
_ = (
p
| "Read" >> beam.io.ReadFromText("/tmp/inputs/winterstale.txt")
| "Remove whitespace" >> beam.Map(lambda element: element.strip("\n\t|"))
| "Remove empty lines" >> beam.FlatMap(lambda element: [element] if element else [])
| "Write" >> beam.Map(lambda element: add_to_queue(element, queue=q))
)
result2 = list(q.queue) # >>> prints "3561" <<<
print(len(result2))
一般来说,所有东西在发送给跑步者之前都会被腌制。在这种情况下,队列对象本身通常会被腌制,并且您的元素在执行期间附加到未腌制的副本(因此 0 return 值)。我认为这里发生的事情是 BundleBasedDirectRunner 对于它腌制的内容是不稳定的(例如,取决于之前是否存在腌制错误,由于包括从主会话中关闭,它可能会放弃所有腌制尝试并继续使用原始对象)。
可能值得尝试与其他一些跑步者一起尝试,在这种情况下,行为应该是一致的(可能始终为零),并且如果存在酸洗错误,它将以信息性方式提出而不是抑制。
我正在尝试查看在使用 Python 的 Apache Beam SDK 中的 DirectRunner
时是否可以将 PCollection 的元素发送到父进程。
但是,我 运行 遇到了一个 st运行ge 错误,当队列被实例化并且管道在脚本的 __main__
部分内被调用时,一切似乎都工作正常,但在子函数内部调用相同代码时则不然。我猜这是由于在幕后进行了一些酸洗/钻孔,但更具体的解释将不胜感激。
下面使用的/tmp/inputs/winterstale.txt
文件可以从以下网址下载:https://storage.googleapis.com/apache-beam-samples/shakespeare/winterstale.txt
from __future__ import print_function
import atexit
import queue
import tempfile
import time
import unittest
import apache_beam as beam
from apache_beam.io.filesystems import FileSystems
from apache_beam.runners.direct.direct_runner import BundleBasedDirectRunner
from apache_beam.runners.interactive.cache_manager import FileBasedCacheManager
from apache_beam.runners.interactive.cache_manager import ReadCache
from apache_beam.runners.interactive.cache_manager import WriteCache
def add_to_queue(element, queue):
queue.put(element)
def write_to_queue():
q = queue.Queue()
with beam.Pipeline(runner=BundleBasedDirectRunner()) as p:
_ = (
p
| "Read" >> beam.io.ReadFromText("/tmp/inputs/winterstale.txt")
| "Remove whitespace" >> beam.Map(lambda element: element.strip("\n\t|"))
| "Remove empty lines" >> beam.FlatMap(lambda element: [element] if element else [])
| "Write" >> beam.Map(lambda element: add_to_queue(element, queue=q))
)
return list(q.queue)
if __name__ == "__main__":
cache_location = tempfile.mkdtemp()
atexit.register(FileSystems.delete, [cache_location])
# Using a function call
cache_manager = FileBasedCacheManager(cache_dir=cache_location)
result1 = write_to_queue()
print(len(result1)) # >>> prints "0" <<<
# Copy-pasing the code from "write_to_queue()"
q = queue.Queue()
with beam.Pipeline(runner=BundleBasedDirectRunner()) as p:
_ = (
p
| "Read" >> beam.io.ReadFromText("/tmp/inputs/winterstale.txt")
| "Remove whitespace" >> beam.Map(lambda element: element.strip("\n\t|"))
| "Remove empty lines" >> beam.FlatMap(lambda element: [element] if element else [])
| "Write" >> beam.Map(lambda element: add_to_queue(element, queue=q))
)
result2 = list(q.queue) # >>> prints "3561" <<<
print(len(result2))
一般来说,所有东西在发送给跑步者之前都会被腌制。在这种情况下,队列对象本身通常会被腌制,并且您的元素在执行期间附加到未腌制的副本(因此 0 return 值)。我认为这里发生的事情是 BundleBasedDirectRunner 对于它腌制的内容是不稳定的(例如,取决于之前是否存在腌制错误,由于包括从主会话中关闭,它可能会放弃所有腌制尝试并继续使用原始对象)。
可能值得尝试与其他一些跑步者一起尝试,在这种情况下,行为应该是一致的(可能始终为零),并且如果存在酸洗错误,它将以信息性方式提出而不是抑制。