AttributeError: 'StreamingPullFuture' object has no attribute 'open'
AttributeError: 'StreamingPullFuture' object has no attribute 'open'
我正在尝试编写一个从订阅者读取消息并将其输出到 bigquery 的作业。
Python 使用的版本是 3.6 。
我在执行代码时收到如下错误:
Traceback (most recent call last):
File "subscriber.py", line 73, in <module>
receive_data(project_id, subscription_name )
File "subscriber.py", line 59, in receive_data
future = subscription.open(callback)
AttributeError: 'StreamingPullFuture' object has no attribuate 'open'
Subscriber.py代码如下:
import base64
import json
import time
from google.cloud import bigquery
from google.cloud import pubsub
project_id = "PROJECT_NAME"
subscription_name = "SUBSCRIPTION_NAME"
DATASET_ID = 'DATASET_NAME'
TABLE_ID = 'test_data'
def write_data_to_bq(dataset_id, table_id, data):
client = bigquery.Client()
dataset_ref = client.dataset(dataset_id)
table_ref = dataset_ref.table(table_id)
table = client.get_table(table_ref)
errors = client.insert_rows(table, data)
if not errors:
print('Loaded {} row(s) into {}:{}'.format(len(data), dataset_id, table_id))
else:
print('Errors:')
for error in errors:
print(error)
# decodes the message from PubSub
def collect_data(data):
inputdata = []
stream = base64.urlsafe_b64decode(data)
twraw = json.loads(stream)
localmessages = twraw.get('messages')
for message in localmessages:
inputdata.append(message['data'])
write_data_to_bq(DATASET_ID, TABLE_ID, inputdata)
# receive data from topic
def receive_data(project, subscription_name):
subscriber = pubsub.SubscriberClient()
subscription_path = subscriber.subscription_path(project, subscription_name)
def callback(message):
print('Received message: {}'.format(message))
collect_data(message.data)
message.ack()
subscription = subscriber.subscribe(subscription_path, callback=callback)
print('Listening for messages on {}'.format(subscription_path))
future = subscription.open(callback)
try:
future.result()
except Exception as e:
print(
'Listening for messages on {} threw an Exception: {}'.format(
subscription_name, e))
raise
while True:
time.sleep(60)
if __name__ == '__main__':
receive_data(project_id, subscription_name)
根据 google 文档,我看到导入的包是 pubsub_v1 所以我将 pubsub
替换为
from google.cloud import pubsub_v1
但即使在更改之后似乎也没有任何效果。
还尝试对 requirements.txt 进行更改,因为这似乎是版本问题。
当前 requirements.txt 为:
cachetools==3.1.1
certifi==2019.9.11
chardet==3.0.4
google-api-core==1.14.3
google-auth==1.6.3
google-cloud-bigquery==1.20.0
google-cloud-core==1.0.3
google-cloud-pubsub==1.0.2
google-resumable-media==0.4.1
googleapis-common-protos==1.6.0
grpc-google-iam-v1==0.12.3
grpcio==1.24.1
idna==2.8
protobuf==3.10.0
pyasn1==0.4.7
pyasn1-modules==0.2.7
pytz==2019.3
requests==2.22.0
rsa==4.0
six==1.12.0
urllib3==1.25.6
有人可以指导吗?
subscriber.subscribe(subscription_path, callback=callback)
调用自身returnsfuture对象,不需要单独的.open()
调用。只需将该结果分配给 future
:
future = subscriber.subscribe(subscription_path, callback=callback)
print('Listening for messages on {}'.format(subscription_path))
try:
future.result()
except Exception as e:
print(
'Listening for messages on {} threw an Exception: {}'.format(
subscription_name, e))
raise
有 no .open()
method in the subscriber client API,您已经使用 subscriber.subscribe()
设置了订阅。
我正在尝试编写一个从订阅者读取消息并将其输出到 bigquery 的作业。 Python 使用的版本是 3.6 。
我在执行代码时收到如下错误:
Traceback (most recent call last):
File "subscriber.py", line 73, in <module>
receive_data(project_id, subscription_name )
File "subscriber.py", line 59, in receive_data
future = subscription.open(callback)
AttributeError: 'StreamingPullFuture' object has no attribuate 'open'
Subscriber.py代码如下:
import base64
import json
import time
from google.cloud import bigquery
from google.cloud import pubsub
project_id = "PROJECT_NAME"
subscription_name = "SUBSCRIPTION_NAME"
DATASET_ID = 'DATASET_NAME'
TABLE_ID = 'test_data'
def write_data_to_bq(dataset_id, table_id, data):
client = bigquery.Client()
dataset_ref = client.dataset(dataset_id)
table_ref = dataset_ref.table(table_id)
table = client.get_table(table_ref)
errors = client.insert_rows(table, data)
if not errors:
print('Loaded {} row(s) into {}:{}'.format(len(data), dataset_id, table_id))
else:
print('Errors:')
for error in errors:
print(error)
# decodes the message from PubSub
def collect_data(data):
inputdata = []
stream = base64.urlsafe_b64decode(data)
twraw = json.loads(stream)
localmessages = twraw.get('messages')
for message in localmessages:
inputdata.append(message['data'])
write_data_to_bq(DATASET_ID, TABLE_ID, inputdata)
# receive data from topic
def receive_data(project, subscription_name):
subscriber = pubsub.SubscriberClient()
subscription_path = subscriber.subscription_path(project, subscription_name)
def callback(message):
print('Received message: {}'.format(message))
collect_data(message.data)
message.ack()
subscription = subscriber.subscribe(subscription_path, callback=callback)
print('Listening for messages on {}'.format(subscription_path))
future = subscription.open(callback)
try:
future.result()
except Exception as e:
print(
'Listening for messages on {} threw an Exception: {}'.format(
subscription_name, e))
raise
while True:
time.sleep(60)
if __name__ == '__main__':
receive_data(project_id, subscription_name)
根据 google 文档,我看到导入的包是 pubsub_v1 所以我将 pubsub
替换为
from google.cloud import pubsub_v1
但即使在更改之后似乎也没有任何效果。
还尝试对 requirements.txt 进行更改,因为这似乎是版本问题。 当前 requirements.txt 为:
cachetools==3.1.1
certifi==2019.9.11
chardet==3.0.4
google-api-core==1.14.3
google-auth==1.6.3
google-cloud-bigquery==1.20.0
google-cloud-core==1.0.3
google-cloud-pubsub==1.0.2
google-resumable-media==0.4.1
googleapis-common-protos==1.6.0
grpc-google-iam-v1==0.12.3
grpcio==1.24.1
idna==2.8
protobuf==3.10.0
pyasn1==0.4.7
pyasn1-modules==0.2.7
pytz==2019.3
requests==2.22.0
rsa==4.0
six==1.12.0
urllib3==1.25.6
有人可以指导吗?
subscriber.subscribe(subscription_path, callback=callback)
调用自身returnsfuture对象,不需要单独的.open()
调用。只需将该结果分配给 future
:
future = subscriber.subscribe(subscription_path, callback=callback)
print('Listening for messages on {}'.format(subscription_path))
try:
future.result()
except Exception as e:
print(
'Listening for messages on {} threw an Exception: {}'.format(
subscription_name, e))
raise
有 no .open()
method in the subscriber client API,您已经使用 subscriber.subscribe()
设置了订阅。