是否可以使用 YouTube API V3 随机抽样 YouTube 评论?

Is it possible to randomly sample YouTube comments with YouTube API V3?

我一直在尝试使用 python 请求下载所有关于热门视频的 YouTube 评论,但在大约四分之一的评论总数之后出现以下错误:

{'error': {'code': 400, 'message': "The API server failed to successfully process the request. While this can be a transient error, it usually indicates that the request's input is invalid. Check the structure of the commentThread resource in the request body to ensure that it is valid.", 'errors': [{'message': "The API server failed to successfully process the request. While this can be a transient error, it usually indicates that the request's input is invalid. Check the structure of the commentThread resource in the request body to ensure that it is valid.", 'domain': 'youtube.commentThread', 'reason': 'processingFailure', 'location': 'body', 'locationType': 'other'}]}}

我发现 个帖子详细说明了同样的问题,似乎无法下载热门视频的所有评论。

这是我的代码:

import argparse
import urllib
import requests
import json
import time
start_time = time.time()

class YouTubeApi():

    YOUTUBE_COMMENTS_URL = 'https://www.googleapis.com/youtube/v3/commentThreads'
    comment_counter = 0
    
    with open("API_keys.txt", "r") as f:
        key_list = f.readlines()    
        key_list = [key.strip('/n') for key in key_list]


    def format_comments(self, results, likes_required):
        comments_list = []
        try:
            for item in results["items"]:
                comment = item["snippet"]["topLevelComment"]

                likes = comment["snippet"]["likeCount"]
                if likes < likes_required:
                    continue

                author = comment["snippet"]["authorDisplayName"]
                text = comment["snippet"]["textDisplay"]

                str = "Comment by {}:\n \"{}\"\n\n".format(author, text)
                str = str.encode('ascii', 'replace').decode()

                comments_list.append(str)
                self.comment_counter += 1
                print("Comments downloaded:", self.comment_counter, end="\r")
        except(KeyError):
            print(results)
            
                
             
        return comments_list

        
    def get_video_comments(self, video_id, likes_required):

        with open("API_keys.txt", "r") as f:
            key_list = f.readlines()
            
        key_list = [key.strip('/n') for key in key_list]

        if self.comment_counter <= 900000:
            key = self.key_list[0]
        elif self.comment_counter <= 1800000:
            key = self.key_list[1]
        elif self.comment_counter <= 2700000:
            key = self.key_list[2]
        elif self.comment_counter <= 3600000:
            key = self.key_list[3]
        elif self.comment_counter <= 4500000:
            key = self.key_list[4]
            
        params = {
            'part': 'snippet,replies',
            'maxResults': 100,
            'videoId': video_id,
            'textFormat': 'plainText',
            'key': key
        }
        headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36'
        }
        try:
            #data = self.openURL(self.YOUTUBE_COMMENTS_URL, params)
            comments_data = requests.get(self.YOUTUBE_COMMENTS_URL, params=params, headers=headers)
        
        except ChunkedEncodingError:
           tries = 5
           print("Chunked Error. Retrying...")
           for n in range(tries):
               try:
                   x = 0
                   x += 1
                   print("Trying", x, "times") 
                   response = session.post("https://www.youtube.com/comment_service_ajax", params=params, data=data, headers=headers)
                   comments_data = json.loads(response.text)
               except ChunkedEncodingError as c:
                   print(c)
        results = comments_data.json()
        
        nextPageToken = results.get("nextPageToken")

        commments_list = []
        commments_list += self.format_comments(results, likes_required)

        while nextPageToken:
            params.update({'pageToken': nextPageToken})
            try:
                comments_data = requests.get(self.YOUTUBE_COMMENTS_URL, params=params, headers=headers)
            except ChunkedEncodingError as c:
                tries = 5
                print("Chunked Error. Retrying...")
                for n in range(tries):
                    try:
                        x = 0
                        x += 1
                        print("Trying", x, "times") 
                        response = session.post("https://www.youtube.com/comment_service_ajax", params=params, data=data, headers=headers)
                        comments_data = json.loads(response.text)
                    except ChunkedEncodingError as c:
                        print(c)
            
            results = comments_data.json()
            nextPageToken = results.get("nextPageToken")
            commments_list += self.format_comments(results, likes_required)

        return commments_list


    def get_video_id_list(self, filename):
        try:
            with open(filename, 'r') as file:
                URL_list = file.readlines()
        except FileNotFoundError:
            exit("File \"" + filename + "\" not found")

        list = []
        for url in URL_list:
            if url == "\n":     # ignore empty lines
                continue
            if url[-1] == '\n':     # delete '\n' at the end of line
                url = url[:-1]
            if url.find('='):   # get id
                id = url[url.find('=') + 1:]
                list.append(id)
            else:
                print("Wrong URL")

        return list


def main():
    yt = YouTubeApi()

    parser = argparse.ArgumentParser(add_help=False, description=("Download youtube comments from many videos into txt file"))
    required = parser.add_argument_group("required arguments")
    optional = parser.add_argument_group("optional arguments")
 here: https://console.developers.google.com/apis/credentials")
    optional.add_argument("--likes", '-l', help="The amount of likes a comment needs to be saved", type=int)
    optional.add_argument("--input", '-i', help="URL list file name")
    optional.add_argument("--output", '-o', help="Output file name")
    optional.add_argument("--help", '-h', help="Help", action='help')
    args = parser.parse_args()

    # --------------------------------------------------------------------- #


    likes = 0
    if args.likes:
        likes = args.likes

    input_file = "URL_list.txt"
    if args.input:
        input_file = args.input

    output_file = "Comments.txt"
    if args.output:
        output_file = args.output

    list = yt.get_video_id_list(input_file)
    if not list:
        exit("No URLs in input file")

    try:
        
        vid_counter = 0
        with open(output_file, "a") as f:
            for video_id in list:
                vid_counter += 1
                print("Downloading comments for video ", vid_counter, ", id: ", video_id, sep='')
                comments = yt.get_video_comments(video_id, likes)
                if comments:
                    for comment in comments:
                        f.write(comment)

        print('\nDone!')

    except KeyboardInterrupt:
        exit("User Aborted the Operation")

    # --------------------------------------------------------------------- #


if __name__ == '__main__':
    main()

下一个最佳方法是对它们进行随机抽样。有谁知道 API V3 是否可行?

即使 API returns 出现 processingFailure 错误,您仍然可以捕获该错误(或与此相关的任何其他 API 错误)以优雅地终止您的分页循环。这样您的脚本将提供它在第一个 API 错误发生之前从 API 中获取的顶级注释。

YouTube 数据 API 提供的 error response(通常)具有以下形式:

{
  "error": {
    "errors": [
      {
        "domain": <string>,
        "reason": <string>,
        "message": <string>,
        "locationType": <string>,
        "location": <string>
      }
    ],
    "code": <integer>,
    "message": <string>
  }
}

因此,您可以定义以下函数:

def is_error_response(response):
    error = response.get('error')
    if error is None:
        return False
    print("API Error: "
        f"code={error['code']} "
        f"domain={error['errors'][0]['domain']} "
        f"reason={error['errors'][0]['reason']} "
        f"message={error['errors'][0]['message']!r}")
    return True

您将在每个 results = comments_data.json() 形式的语句之后调用。如果该语句第一次出现,您将拥有:

results = comments_data.json()
if is_error_response(results):
   return []
nextPageToken = results.get("nextPageToken")

对于该语句的第二个实例:

results = comments_data.json()
if is_error_response(results):
   return comments_list
nextPageToken = results.get("nextPageToken")

请注意,上面的函数 is_error_responsestdout 上打印出一条错误消息,以防它的参数出现在 API 错误响应中;这是为了让脚本的用户了解 API 调用失败。