是否可以使用 YouTube API V3 随机抽样 YouTube 评论?
Is it possible to randomly sample YouTube comments with YouTube API V3?
我一直在尝试使用 python 请求下载所有关于热门视频的 YouTube 评论,但在大约四分之一的评论总数之后出现以下错误:
{'error': {'code': 400, 'message': "The API server failed to successfully process the request. While this can be a transient error, it usually indicates that the request's input is invalid. Check the structure of the commentThread
resource in the request body to ensure that it is valid.", 'errors': [{'message': "The API server failed to successfully process the request. While this can be a transient error, it usually indicates that the request's input is invalid. Check the structure of the commentThread
resource in the request body to ensure that it is valid.", 'domain': 'youtube.commentThread', 'reason': 'processingFailure', 'location': 'body', 'locationType': 'other'}]}}
我发现 个帖子详细说明了同样的问题,似乎无法下载热门视频的所有评论。
这是我的代码:
import argparse
import urllib
import requests
import json
import time
start_time = time.time()
class YouTubeApi():
YOUTUBE_COMMENTS_URL = 'https://www.googleapis.com/youtube/v3/commentThreads'
comment_counter = 0
with open("API_keys.txt", "r") as f:
key_list = f.readlines()
key_list = [key.strip('/n') for key in key_list]
def format_comments(self, results, likes_required):
comments_list = []
try:
for item in results["items"]:
comment = item["snippet"]["topLevelComment"]
likes = comment["snippet"]["likeCount"]
if likes < likes_required:
continue
author = comment["snippet"]["authorDisplayName"]
text = comment["snippet"]["textDisplay"]
str = "Comment by {}:\n \"{}\"\n\n".format(author, text)
str = str.encode('ascii', 'replace').decode()
comments_list.append(str)
self.comment_counter += 1
print("Comments downloaded:", self.comment_counter, end="\r")
except(KeyError):
print(results)
return comments_list
def get_video_comments(self, video_id, likes_required):
with open("API_keys.txt", "r") as f:
key_list = f.readlines()
key_list = [key.strip('/n') for key in key_list]
if self.comment_counter <= 900000:
key = self.key_list[0]
elif self.comment_counter <= 1800000:
key = self.key_list[1]
elif self.comment_counter <= 2700000:
key = self.key_list[2]
elif self.comment_counter <= 3600000:
key = self.key_list[3]
elif self.comment_counter <= 4500000:
key = self.key_list[4]
params = {
'part': 'snippet,replies',
'maxResults': 100,
'videoId': video_id,
'textFormat': 'plainText',
'key': key
}
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36'
}
try:
#data = self.openURL(self.YOUTUBE_COMMENTS_URL, params)
comments_data = requests.get(self.YOUTUBE_COMMENTS_URL, params=params, headers=headers)
except ChunkedEncodingError:
tries = 5
print("Chunked Error. Retrying...")
for n in range(tries):
try:
x = 0
x += 1
print("Trying", x, "times")
response = session.post("https://www.youtube.com/comment_service_ajax", params=params, data=data, headers=headers)
comments_data = json.loads(response.text)
except ChunkedEncodingError as c:
print(c)
results = comments_data.json()
nextPageToken = results.get("nextPageToken")
commments_list = []
commments_list += self.format_comments(results, likes_required)
while nextPageToken:
params.update({'pageToken': nextPageToken})
try:
comments_data = requests.get(self.YOUTUBE_COMMENTS_URL, params=params, headers=headers)
except ChunkedEncodingError as c:
tries = 5
print("Chunked Error. Retrying...")
for n in range(tries):
try:
x = 0
x += 1
print("Trying", x, "times")
response = session.post("https://www.youtube.com/comment_service_ajax", params=params, data=data, headers=headers)
comments_data = json.loads(response.text)
except ChunkedEncodingError as c:
print(c)
results = comments_data.json()
nextPageToken = results.get("nextPageToken")
commments_list += self.format_comments(results, likes_required)
return commments_list
def get_video_id_list(self, filename):
try:
with open(filename, 'r') as file:
URL_list = file.readlines()
except FileNotFoundError:
exit("File \"" + filename + "\" not found")
list = []
for url in URL_list:
if url == "\n": # ignore empty lines
continue
if url[-1] == '\n': # delete '\n' at the end of line
url = url[:-1]
if url.find('='): # get id
id = url[url.find('=') + 1:]
list.append(id)
else:
print("Wrong URL")
return list
def main():
yt = YouTubeApi()
parser = argparse.ArgumentParser(add_help=False, description=("Download youtube comments from many videos into txt file"))
required = parser.add_argument_group("required arguments")
optional = parser.add_argument_group("optional arguments")
here: https://console.developers.google.com/apis/credentials")
optional.add_argument("--likes", '-l', help="The amount of likes a comment needs to be saved", type=int)
optional.add_argument("--input", '-i', help="URL list file name")
optional.add_argument("--output", '-o', help="Output file name")
optional.add_argument("--help", '-h', help="Help", action='help')
args = parser.parse_args()
# --------------------------------------------------------------------- #
likes = 0
if args.likes:
likes = args.likes
input_file = "URL_list.txt"
if args.input:
input_file = args.input
output_file = "Comments.txt"
if args.output:
output_file = args.output
list = yt.get_video_id_list(input_file)
if not list:
exit("No URLs in input file")
try:
vid_counter = 0
with open(output_file, "a") as f:
for video_id in list:
vid_counter += 1
print("Downloading comments for video ", vid_counter, ", id: ", video_id, sep='')
comments = yt.get_video_comments(video_id, likes)
if comments:
for comment in comments:
f.write(comment)
print('\nDone!')
except KeyboardInterrupt:
exit("User Aborted the Operation")
# --------------------------------------------------------------------- #
if __name__ == '__main__':
main()
下一个最佳方法是对它们进行随机抽样。有谁知道 API V3 是否可行?
即使 API returns 出现 processingFailure
错误,您仍然可以捕获该错误(或与此相关的任何其他 API 错误)以优雅地终止您的分页循环。这样您的脚本将提供它在第一个 API 错误发生之前从 API 中获取的顶级注释。
YouTube 数据 API 提供的 error response(通常)具有以下形式:
{
"error": {
"errors": [
{
"domain": <string>,
"reason": <string>,
"message": <string>,
"locationType": <string>,
"location": <string>
}
],
"code": <integer>,
"message": <string>
}
}
因此,您可以定义以下函数:
def is_error_response(response):
error = response.get('error')
if error is None:
return False
print("API Error: "
f"code={error['code']} "
f"domain={error['errors'][0]['domain']} "
f"reason={error['errors'][0]['reason']} "
f"message={error['errors'][0]['message']!r}")
return True
您将在每个 results = comments_data.json()
形式的语句之后调用。如果该语句第一次出现,您将拥有:
results = comments_data.json()
if is_error_response(results):
return []
nextPageToken = results.get("nextPageToken")
对于该语句的第二个实例:
results = comments_data.json()
if is_error_response(results):
return comments_list
nextPageToken = results.get("nextPageToken")
请注意,上面的函数 is_error_response
在 stdout
上打印出一条错误消息,以防它的参数出现在 API 错误响应中;这是为了让脚本的用户了解 API 调用失败。
我一直在尝试使用 python 请求下载所有关于热门视频的 YouTube 评论,但在大约四分之一的评论总数之后出现以下错误:
{'error': {'code': 400, 'message': "The API server failed to successfully process the request. While this can be a transient error, it usually indicates that the request's input is invalid. Check the structure of the
commentThread
resource in the request body to ensure that it is valid.", 'errors': [{'message': "The API server failed to successfully process the request. While this can be a transient error, it usually indicates that the request's input is invalid. Check the structure of thecommentThread
resource in the request body to ensure that it is valid.", 'domain': 'youtube.commentThread', 'reason': 'processingFailure', 'location': 'body', 'locationType': 'other'}]}}
我发现
这是我的代码:
import argparse
import urllib
import requests
import json
import time
start_time = time.time()
class YouTubeApi():
YOUTUBE_COMMENTS_URL = 'https://www.googleapis.com/youtube/v3/commentThreads'
comment_counter = 0
with open("API_keys.txt", "r") as f:
key_list = f.readlines()
key_list = [key.strip('/n') for key in key_list]
def format_comments(self, results, likes_required):
comments_list = []
try:
for item in results["items"]:
comment = item["snippet"]["topLevelComment"]
likes = comment["snippet"]["likeCount"]
if likes < likes_required:
continue
author = comment["snippet"]["authorDisplayName"]
text = comment["snippet"]["textDisplay"]
str = "Comment by {}:\n \"{}\"\n\n".format(author, text)
str = str.encode('ascii', 'replace').decode()
comments_list.append(str)
self.comment_counter += 1
print("Comments downloaded:", self.comment_counter, end="\r")
except(KeyError):
print(results)
return comments_list
def get_video_comments(self, video_id, likes_required):
with open("API_keys.txt", "r") as f:
key_list = f.readlines()
key_list = [key.strip('/n') for key in key_list]
if self.comment_counter <= 900000:
key = self.key_list[0]
elif self.comment_counter <= 1800000:
key = self.key_list[1]
elif self.comment_counter <= 2700000:
key = self.key_list[2]
elif self.comment_counter <= 3600000:
key = self.key_list[3]
elif self.comment_counter <= 4500000:
key = self.key_list[4]
params = {
'part': 'snippet,replies',
'maxResults': 100,
'videoId': video_id,
'textFormat': 'plainText',
'key': key
}
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36'
}
try:
#data = self.openURL(self.YOUTUBE_COMMENTS_URL, params)
comments_data = requests.get(self.YOUTUBE_COMMENTS_URL, params=params, headers=headers)
except ChunkedEncodingError:
tries = 5
print("Chunked Error. Retrying...")
for n in range(tries):
try:
x = 0
x += 1
print("Trying", x, "times")
response = session.post("https://www.youtube.com/comment_service_ajax", params=params, data=data, headers=headers)
comments_data = json.loads(response.text)
except ChunkedEncodingError as c:
print(c)
results = comments_data.json()
nextPageToken = results.get("nextPageToken")
commments_list = []
commments_list += self.format_comments(results, likes_required)
while nextPageToken:
params.update({'pageToken': nextPageToken})
try:
comments_data = requests.get(self.YOUTUBE_COMMENTS_URL, params=params, headers=headers)
except ChunkedEncodingError as c:
tries = 5
print("Chunked Error. Retrying...")
for n in range(tries):
try:
x = 0
x += 1
print("Trying", x, "times")
response = session.post("https://www.youtube.com/comment_service_ajax", params=params, data=data, headers=headers)
comments_data = json.loads(response.text)
except ChunkedEncodingError as c:
print(c)
results = comments_data.json()
nextPageToken = results.get("nextPageToken")
commments_list += self.format_comments(results, likes_required)
return commments_list
def get_video_id_list(self, filename):
try:
with open(filename, 'r') as file:
URL_list = file.readlines()
except FileNotFoundError:
exit("File \"" + filename + "\" not found")
list = []
for url in URL_list:
if url == "\n": # ignore empty lines
continue
if url[-1] == '\n': # delete '\n' at the end of line
url = url[:-1]
if url.find('='): # get id
id = url[url.find('=') + 1:]
list.append(id)
else:
print("Wrong URL")
return list
def main():
yt = YouTubeApi()
parser = argparse.ArgumentParser(add_help=False, description=("Download youtube comments from many videos into txt file"))
required = parser.add_argument_group("required arguments")
optional = parser.add_argument_group("optional arguments")
here: https://console.developers.google.com/apis/credentials")
optional.add_argument("--likes", '-l', help="The amount of likes a comment needs to be saved", type=int)
optional.add_argument("--input", '-i', help="URL list file name")
optional.add_argument("--output", '-o', help="Output file name")
optional.add_argument("--help", '-h', help="Help", action='help')
args = parser.parse_args()
# --------------------------------------------------------------------- #
likes = 0
if args.likes:
likes = args.likes
input_file = "URL_list.txt"
if args.input:
input_file = args.input
output_file = "Comments.txt"
if args.output:
output_file = args.output
list = yt.get_video_id_list(input_file)
if not list:
exit("No URLs in input file")
try:
vid_counter = 0
with open(output_file, "a") as f:
for video_id in list:
vid_counter += 1
print("Downloading comments for video ", vid_counter, ", id: ", video_id, sep='')
comments = yt.get_video_comments(video_id, likes)
if comments:
for comment in comments:
f.write(comment)
print('\nDone!')
except KeyboardInterrupt:
exit("User Aborted the Operation")
# --------------------------------------------------------------------- #
if __name__ == '__main__':
main()
下一个最佳方法是对它们进行随机抽样。有谁知道 API V3 是否可行?
即使 API returns 出现 processingFailure
错误,您仍然可以捕获该错误(或与此相关的任何其他 API 错误)以优雅地终止您的分页循环。这样您的脚本将提供它在第一个 API 错误发生之前从 API 中获取的顶级注释。
YouTube 数据 API 提供的 error response(通常)具有以下形式:
{
"error": {
"errors": [
{
"domain": <string>,
"reason": <string>,
"message": <string>,
"locationType": <string>,
"location": <string>
}
],
"code": <integer>,
"message": <string>
}
}
因此,您可以定义以下函数:
def is_error_response(response):
error = response.get('error')
if error is None:
return False
print("API Error: "
f"code={error['code']} "
f"domain={error['errors'][0]['domain']} "
f"reason={error['errors'][0]['reason']} "
f"message={error['errors'][0]['message']!r}")
return True
您将在每个 results = comments_data.json()
形式的语句之后调用。如果该语句第一次出现,您将拥有:
results = comments_data.json()
if is_error_response(results):
return []
nextPageToken = results.get("nextPageToken")
对于该语句的第二个实例:
results = comments_data.json()
if is_error_response(results):
return comments_list
nextPageToken = results.get("nextPageToken")
请注意,上面的函数 is_error_response
在 stdout
上打印出一条错误消息,以防它的参数出现在 API 错误响应中;这是为了让脚本的用户了解 API 调用失败。