不清楚从 pandas 上传到 Google BigQuery table 时的错误消息
Unclear on error message in upload from pandas to Google BigQuery table
情况
我正在尝试将 Twitter API 数据的 pandas 数据帧上传到 BigQuery 中的 table。
这是来自 Google Colab notebook 的数据框准备代码:
!pip install --upgrade google-cloud-language
!pip install pandas-gbq -U
from google.colab import files
uploaded = files.upload()
for fn in uploaded.keys():
print('User uploaded file "{name}" with length {length} bytes'.format(name=fn, length=len(uploaded[fn])))
import os
# Imports Credential File:
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "pp-004a-d61bf3451d85.json"
print("Service Account Key: {}".format(os.environ["GOOGLE_APPLICATION_CREDENTIALS"]))
!pip install --upgrade tweepy
# VARIABLES
interval = "15"
start = '2022-04-07'
end = '2022-04-12'
# Tweepy
searchQ = '(max muncy) -is:retweet lang:en'
intval_tw = "{}T".format(interval)
start_tw = '{}T00:00:00Z'.format(start)
end_tw = '{}T23:59:59Z'.format(end)
# index = pd.date_range('1/1/2000', periods=9, freq='T')
# D = calendar day frequency, H = hourly frequency, T, min = minutely frequency
# Library installs
import tweepy
# from twitter_authentication import bearer_token
import time
import pandas as pd
import requests
import json
import numpy as np
bearer_token = "BEARER_TOKEN"
client = tweepy.Client(bearer_token, wait_on_rate_limit=True)
# NEED TO ENSURE HAVE ALL PARAMETERS
gathered_tweets = []
for response in tweepy.Paginator(client.search_recent_tweets,
query = searchQ,
user_fields = ['name', 'description', 'username', 'profile_image_url', 'url', 'pinned_tweet_id', 'verified', 'created_at', 'location', 'public_metrics', 'entities'],
tweet_fields = ['public_metrics', 'created_at','lang', 'attachments', 'context_annotations', 'conversation_id', 'entities', 'geo', 'in_reply_to_user_id', 'possibly_sensitive', 'referenced_tweets', 'reply_settings', 'source'],
media_fields = ['duration_ms', 'media_key', 'preview_image_url', 'type', 'url', 'height', 'width', 'public_metrics'],
expansions = ['author_id', 'attachments.media_keys', 'entities.mentions.username', 'geo.place_id', 'in_reply_to_user_id', 'referenced_tweets.id', 'referenced_tweets.id.author_id'],
start_time = start_tw,
end_time = end_tw,
max_results=100):
time.sleep(1)
gathered_tweets.append(response)
result = []
user_dict = {}
# Loop through each response object
for response in gathered_tweets:
# Take all of the users, and put them into a dictionary of dictionaries with the info we want to keep
for user in response.includes['users']:
user_dict[user.id] = {'username': user.username,
'created_at': user.created_at,
'location': user.location,
'verified': user.verified,
'name': user.name,
'description': user.description,
'url': user.url,
'profile_image_url': user.profile_image_url,
'pinned_tweet': user.pinned_tweet_id,
'entities': user.entities,
'followers': user.public_metrics['followers_count'],
'total_tweets': user.public_metrics['tweet_count'],
'following': user.public_metrics['following_count'],
'listed': user.public_metrics['listed_count'],
'tweets': user.public_metrics['tweet_count']
}
for tweet in response.data:
# For each tweet, find the author's information
author_info = user_dict[tweet.author_id]
# Put all of the information we want to keep in a single dictionary for each tweet
result.append({'author_id': tweet.author_id,
'username': author_info['username'],
'name': author_info['name'],
'author_followers': author_info['followers'],
'author_following': author_info['following'],
'author_tweets': author_info['tweets'],
'author_description': author_info['description'],
'author_url': author_info['url'],
'profile_image_url': author_info['profile_image_url'],
#'pinned_tweet': author_info['pinned_tweet_id'], https://developer.twitter.com/en/docs/twitter-api/tweets/lookup/api-reference/get-tweets
#'total_tweets': author_info['tweet_count'],
#'listed_count': author_info['listed_count'],
'entities': author_info['entities'],
'verified': author_info['verified'],
'account_created_at': author_info['created_at'],
'text': tweet.text,
'created_at': tweet.created_at,
'lang': tweet.lang,
'tweet_id': tweet.id,
'retweets': tweet.public_metrics['retweet_count'],
'replies': tweet.public_metrics['reply_count'],
'likes': tweet.public_metrics['like_count'],
'quotes': tweet.public_metrics['quote_count'],
'replied': tweet.in_reply_to_user_id,
'sensitive': tweet.possibly_sensitive,
'referenced_tweets': tweet.referenced_tweets,
'reply_settings': tweet.reply_settings,
'source': tweet.source
#'video_views': tweet.public_metrics['view_count']
})
dfTW00 = pd.DataFrame(result)
dfTW01 = dfTW00
# Create 'engagement' metric
dfTW01['engagement'] = dfTW01['retweets'] + dfTW01['replies'] + dfTW01['likes'] + dfTW01['quotes']
# Add 'tweets' column with value of 1
dfTW01['tweets'] = 1
# Engagement Rate calc
dfTW01['eng_rate'] = (dfTW01['tweets'] / dfTW01['engagement'])
# Add twitter link
dfTW01['base_url'] = 'https://twitter.com/twitter/status/'
# base_url = 'https://twitter.com/twitter/status/'
dfTW01['tweet_link'] = dfTW01['base_url'] + dfTW01['tweet_id'].astype(str)
# Imports the Google Cloud client library
from google.cloud import language_v1
# Instantiates a client
client = language_v1.LanguageServiceClient()
def get_sentiment(text):
# The text to analyze
document = language_v1.Document(
content=text,
type_=language_v1.types.Document.Type.PLAIN_TEXT
)
# Detects the sentiment of the text
sentiment = client.analyze_sentiment(
request={"document": document}
).document_sentiment
return sentiment
dfTW01["sentiment"] = dfTW01["text"].apply(get_sentiment)
dfTW02 = dfTW01['sentiment'].astype(str).str.split(expand=True)
dfTW02
dfTW03 = pd.merge(dfTW01, dfTW02, left_index=True, right_index=True)
dfTW03.rename(columns = {1:'magnitude', 3:'score'}, inplace=True)
cols = ['magnitude', 'score']
dfTW03[cols] = dfTW03[cols].apply(pd.to_numeric, errors='coerce', axis=1)
def return_status(x):
if x >= .5:
return 'Positive'
elif x <= -.5:
return 'Negative'
return 'Neutral'
dfTW03['sentiment2'] = dfTW03['score'].apply(return_status)
我试过的
这是我用于上传的内容(我已确认项目、数据集和 table 信息正确无误):
df.to_gbq('004a01.004a-TW-01',
'pp-004a',
chunksize=None,
if_exists='append'
)
结果
但是,该方法返回此错误消息:
TypeError: <' not supported between instances of 'int' and 'str'
评估
我在 SO 上找到了几个解决这个问题的帖子,但我无法将它们与我的情况联系起来。 (我认为可以将各种数据类型上传到 BigQuery table。)
首先,我不清楚 '<' not supported between instances of 'int' and 'str'
的错误消息是什么意思。
如有任何意见,我们将不胜感激。
如果有帮助,下面是我数据框中的 pandas dtype。
Dataframe dtypes
Pandas 数据帧数据类型:
author_id int64
username object
name object
author_followers int64
author_following int64
author_tweets int64
author_description object
author_url object
profile_image_url object
entities object
verified bool
account_created_at datetime64[ns, UTC]
text object
created_at datetime64[ns, UTC]
lang object
tweet_id int64
retweets int64
replies int64
likes int64
quotes int64
replied float64
sensitive bool
referenced_tweets object
reply_settings object
source object
engagement int64
tweets int64
eng_rate float64
base_url object
tweet_link object
sentiment object
0 object
magnitude float64
2 object
score float64
sentiment_rating float64
sentiment2 object
dtype: object
代替 Pandas 中的 to_gbq()
函数,您可以尝试使用 BigQuery 库中的 load_table_from_dataframe()
函数将数据框加载到 BigQuery。
请参阅下面的示例 python 代码,使用 load_table_from_dataframe()
:
import datetime
from google.cloud import bigquery
import pandas
import pytz
# Construct a BigQuery client object.
client = bigquery.Client()
# TODO(developer): Set table_id to the ID of the table to create.
table_id = "my-project.my-dataset.my-table"
records = [
{
"title": "The Meaning of Life",
"release_year": 1983,
"length_minutes": 112.5,
"release_date": pytz.timezone("Europe/Paris")
.localize(datetime.datetime(1983, 5, 9, 13, 0, 0))
.astimezone(pytz.utc),
# Assume UTC timezone when a datetime object contains no timezone.
"dvd_release": datetime.datetime(2002, 1, 22, 7, 0, 0),
},
{
"title": "Monty Python and the Holy Grail",
"release_year": 1975,
"length_minutes": 91.5,
"release_date": pytz.timezone("Europe/London")
.localize(datetime.datetime(1975, 4, 9, 23, 59, 2))
.astimezone(pytz.utc),
"dvd_release": datetime.datetime(2002, 7, 16, 9, 0, 0),
},
{
"title": "Life of Brian",
"release_year": 1979,
"length_minutes": 94.25,
"release_date": pytz.timezone("America/New_York")
.localize(datetime.datetime(1979, 8, 17, 23, 59, 5))
.astimezone(pytz.utc),
"dvd_release": datetime.datetime(2008, 1, 14, 8, 0, 0),
},
{
"title": "And Now for Something Completely Different",
"release_year": 1971,
"length_minutes": 88.0,
"release_date": pytz.timezone("Europe/London")
.localize(datetime.datetime(1971, 9, 28, 23, 59, 7))
.astimezone(pytz.utc),
"dvd_release": datetime.datetime(2003, 10, 22, 10, 0, 0),
},
]
dataframe = pandas.DataFrame(
records,
# In the loaded table, the column order reflects the order of the
# columns in the DataFrame.
columns=[
"title",
"release_year",
"length_minutes",
"release_date",
"dvd_release",
],
# Optionally, set a named index, which can also be written to the
# BigQuery table.
index=pandas.Index(
["Q24980", "Q25043", "Q24953", "Q16403"], name="wikidata_id"
),
)
job_config = bigquery.LoadJobConfig(
# Specify a (partial) schema. All columns are always written to the
# table. The schema is used to assist in data type definitions.
schema=[
# Specify the type of columns whose type cannot be auto-detected. For
# example the "title" column uses pandas dtype "object", so its
# data type is ambiguous.
bigquery.SchemaField("title", bigquery.enums.SqlTypeNames.STRING),
# Indexes are written if included in the schema by name.
bigquery.SchemaField("wikidata_id", bigquery.enums.SqlTypeNames.STRING),
],
# Optionally, set the write disposition. BigQuery appends loaded rows
# to an existing table by default, but with WRITE_TRUNCATE write
# disposition it replaces the table with the loaded data.
write_disposition="WRITE_TRUNCATE"
)
job = client.load_table_from_dataframe(
dataframe, table_id, job_config=job_config
) # Make an API request.
job.result() # Wait for the job to complete.
table = client.get_table(table_id) # Make an API request.
print(
"Loaded {} rows and {} columns to {}".format(
table.num_rows, len(table.schema), table_id
)
)
情况
我正在尝试将 Twitter API 数据的 pandas 数据帧上传到 BigQuery 中的 table。
这是来自 Google Colab notebook 的数据框准备代码:
!pip install --upgrade google-cloud-language
!pip install pandas-gbq -U
from google.colab import files
uploaded = files.upload()
for fn in uploaded.keys():
print('User uploaded file "{name}" with length {length} bytes'.format(name=fn, length=len(uploaded[fn])))
import os
# Imports Credential File:
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "pp-004a-d61bf3451d85.json"
print("Service Account Key: {}".format(os.environ["GOOGLE_APPLICATION_CREDENTIALS"]))
!pip install --upgrade tweepy
# VARIABLES
interval = "15"
start = '2022-04-07'
end = '2022-04-12'
# Tweepy
searchQ = '(max muncy) -is:retweet lang:en'
intval_tw = "{}T".format(interval)
start_tw = '{}T00:00:00Z'.format(start)
end_tw = '{}T23:59:59Z'.format(end)
# index = pd.date_range('1/1/2000', periods=9, freq='T')
# D = calendar day frequency, H = hourly frequency, T, min = minutely frequency
# Library installs
import tweepy
# from twitter_authentication import bearer_token
import time
import pandas as pd
import requests
import json
import numpy as np
bearer_token = "BEARER_TOKEN"
client = tweepy.Client(bearer_token, wait_on_rate_limit=True)
# NEED TO ENSURE HAVE ALL PARAMETERS
gathered_tweets = []
for response in tweepy.Paginator(client.search_recent_tweets,
query = searchQ,
user_fields = ['name', 'description', 'username', 'profile_image_url', 'url', 'pinned_tweet_id', 'verified', 'created_at', 'location', 'public_metrics', 'entities'],
tweet_fields = ['public_metrics', 'created_at','lang', 'attachments', 'context_annotations', 'conversation_id', 'entities', 'geo', 'in_reply_to_user_id', 'possibly_sensitive', 'referenced_tweets', 'reply_settings', 'source'],
media_fields = ['duration_ms', 'media_key', 'preview_image_url', 'type', 'url', 'height', 'width', 'public_metrics'],
expansions = ['author_id', 'attachments.media_keys', 'entities.mentions.username', 'geo.place_id', 'in_reply_to_user_id', 'referenced_tweets.id', 'referenced_tweets.id.author_id'],
start_time = start_tw,
end_time = end_tw,
max_results=100):
time.sleep(1)
gathered_tweets.append(response)
result = []
user_dict = {}
# Loop through each response object
for response in gathered_tweets:
# Take all of the users, and put them into a dictionary of dictionaries with the info we want to keep
for user in response.includes['users']:
user_dict[user.id] = {'username': user.username,
'created_at': user.created_at,
'location': user.location,
'verified': user.verified,
'name': user.name,
'description': user.description,
'url': user.url,
'profile_image_url': user.profile_image_url,
'pinned_tweet': user.pinned_tweet_id,
'entities': user.entities,
'followers': user.public_metrics['followers_count'],
'total_tweets': user.public_metrics['tweet_count'],
'following': user.public_metrics['following_count'],
'listed': user.public_metrics['listed_count'],
'tweets': user.public_metrics['tweet_count']
}
for tweet in response.data:
# For each tweet, find the author's information
author_info = user_dict[tweet.author_id]
# Put all of the information we want to keep in a single dictionary for each tweet
result.append({'author_id': tweet.author_id,
'username': author_info['username'],
'name': author_info['name'],
'author_followers': author_info['followers'],
'author_following': author_info['following'],
'author_tweets': author_info['tweets'],
'author_description': author_info['description'],
'author_url': author_info['url'],
'profile_image_url': author_info['profile_image_url'],
#'pinned_tweet': author_info['pinned_tweet_id'], https://developer.twitter.com/en/docs/twitter-api/tweets/lookup/api-reference/get-tweets
#'total_tweets': author_info['tweet_count'],
#'listed_count': author_info['listed_count'],
'entities': author_info['entities'],
'verified': author_info['verified'],
'account_created_at': author_info['created_at'],
'text': tweet.text,
'created_at': tweet.created_at,
'lang': tweet.lang,
'tweet_id': tweet.id,
'retweets': tweet.public_metrics['retweet_count'],
'replies': tweet.public_metrics['reply_count'],
'likes': tweet.public_metrics['like_count'],
'quotes': tweet.public_metrics['quote_count'],
'replied': tweet.in_reply_to_user_id,
'sensitive': tweet.possibly_sensitive,
'referenced_tweets': tweet.referenced_tweets,
'reply_settings': tweet.reply_settings,
'source': tweet.source
#'video_views': tweet.public_metrics['view_count']
})
dfTW00 = pd.DataFrame(result)
dfTW01 = dfTW00
# Create 'engagement' metric
dfTW01['engagement'] = dfTW01['retweets'] + dfTW01['replies'] + dfTW01['likes'] + dfTW01['quotes']
# Add 'tweets' column with value of 1
dfTW01['tweets'] = 1
# Engagement Rate calc
dfTW01['eng_rate'] = (dfTW01['tweets'] / dfTW01['engagement'])
# Add twitter link
dfTW01['base_url'] = 'https://twitter.com/twitter/status/'
# base_url = 'https://twitter.com/twitter/status/'
dfTW01['tweet_link'] = dfTW01['base_url'] + dfTW01['tweet_id'].astype(str)
# Imports the Google Cloud client library
from google.cloud import language_v1
# Instantiates a client
client = language_v1.LanguageServiceClient()
def get_sentiment(text):
# The text to analyze
document = language_v1.Document(
content=text,
type_=language_v1.types.Document.Type.PLAIN_TEXT
)
# Detects the sentiment of the text
sentiment = client.analyze_sentiment(
request={"document": document}
).document_sentiment
return sentiment
dfTW01["sentiment"] = dfTW01["text"].apply(get_sentiment)
dfTW02 = dfTW01['sentiment'].astype(str).str.split(expand=True)
dfTW02
dfTW03 = pd.merge(dfTW01, dfTW02, left_index=True, right_index=True)
dfTW03.rename(columns = {1:'magnitude', 3:'score'}, inplace=True)
cols = ['magnitude', 'score']
dfTW03[cols] = dfTW03[cols].apply(pd.to_numeric, errors='coerce', axis=1)
def return_status(x):
if x >= .5:
return 'Positive'
elif x <= -.5:
return 'Negative'
return 'Neutral'
dfTW03['sentiment2'] = dfTW03['score'].apply(return_status)
我试过的
这是我用于上传的内容(我已确认项目、数据集和 table 信息正确无误):
df.to_gbq('004a01.004a-TW-01',
'pp-004a',
chunksize=None,
if_exists='append'
)
结果
但是,该方法返回此错误消息:
TypeError: <' not supported between instances of 'int' and 'str'
评估
我在 SO 上找到了几个解决这个问题的帖子,但我无法将它们与我的情况联系起来。 (我认为可以将各种数据类型上传到 BigQuery table。)
首先,我不清楚 '<' not supported between instances of 'int' and 'str'
的错误消息是什么意思。
如有任何意见,我们将不胜感激。
如果有帮助,下面是我数据框中的 pandas dtype。
Dataframe dtypes
Pandas 数据帧数据类型:
author_id int64
username object
name object
author_followers int64
author_following int64
author_tweets int64
author_description object
author_url object
profile_image_url object
entities object
verified bool
account_created_at datetime64[ns, UTC]
text object
created_at datetime64[ns, UTC]
lang object
tweet_id int64
retweets int64
replies int64
likes int64
quotes int64
replied float64
sensitive bool
referenced_tweets object
reply_settings object
source object
engagement int64
tweets int64
eng_rate float64
base_url object
tweet_link object
sentiment object
0 object
magnitude float64
2 object
score float64
sentiment_rating float64
sentiment2 object
dtype: object
代替 Pandas 中的 to_gbq()
函数,您可以尝试使用 BigQuery 库中的 load_table_from_dataframe()
函数将数据框加载到 BigQuery。
请参阅下面的示例 python 代码,使用 load_table_from_dataframe()
:
import datetime
from google.cloud import bigquery
import pandas
import pytz
# Construct a BigQuery client object.
client = bigquery.Client()
# TODO(developer): Set table_id to the ID of the table to create.
table_id = "my-project.my-dataset.my-table"
records = [
{
"title": "The Meaning of Life",
"release_year": 1983,
"length_minutes": 112.5,
"release_date": pytz.timezone("Europe/Paris")
.localize(datetime.datetime(1983, 5, 9, 13, 0, 0))
.astimezone(pytz.utc),
# Assume UTC timezone when a datetime object contains no timezone.
"dvd_release": datetime.datetime(2002, 1, 22, 7, 0, 0),
},
{
"title": "Monty Python and the Holy Grail",
"release_year": 1975,
"length_minutes": 91.5,
"release_date": pytz.timezone("Europe/London")
.localize(datetime.datetime(1975, 4, 9, 23, 59, 2))
.astimezone(pytz.utc),
"dvd_release": datetime.datetime(2002, 7, 16, 9, 0, 0),
},
{
"title": "Life of Brian",
"release_year": 1979,
"length_minutes": 94.25,
"release_date": pytz.timezone("America/New_York")
.localize(datetime.datetime(1979, 8, 17, 23, 59, 5))
.astimezone(pytz.utc),
"dvd_release": datetime.datetime(2008, 1, 14, 8, 0, 0),
},
{
"title": "And Now for Something Completely Different",
"release_year": 1971,
"length_minutes": 88.0,
"release_date": pytz.timezone("Europe/London")
.localize(datetime.datetime(1971, 9, 28, 23, 59, 7))
.astimezone(pytz.utc),
"dvd_release": datetime.datetime(2003, 10, 22, 10, 0, 0),
},
]
dataframe = pandas.DataFrame(
records,
# In the loaded table, the column order reflects the order of the
# columns in the DataFrame.
columns=[
"title",
"release_year",
"length_minutes",
"release_date",
"dvd_release",
],
# Optionally, set a named index, which can also be written to the
# BigQuery table.
index=pandas.Index(
["Q24980", "Q25043", "Q24953", "Q16403"], name="wikidata_id"
),
)
job_config = bigquery.LoadJobConfig(
# Specify a (partial) schema. All columns are always written to the
# table. The schema is used to assist in data type definitions.
schema=[
# Specify the type of columns whose type cannot be auto-detected. For
# example the "title" column uses pandas dtype "object", so its
# data type is ambiguous.
bigquery.SchemaField("title", bigquery.enums.SqlTypeNames.STRING),
# Indexes are written if included in the schema by name.
bigquery.SchemaField("wikidata_id", bigquery.enums.SqlTypeNames.STRING),
],
# Optionally, set the write disposition. BigQuery appends loaded rows
# to an existing table by default, but with WRITE_TRUNCATE write
# disposition it replaces the table with the loaded data.
write_disposition="WRITE_TRUNCATE"
)
job = client.load_table_from_dataframe(
dataframe, table_id, job_config=job_config
) # Make an API request.
job.result() # Wait for the job to complete.
table = client.get_table(table_id) # Make an API request.
print(
"Loaded {} rows and {} columns to {}".format(
table.num_rows, len(table.schema), table_id
)
)