如何为存储桶中文件的更改创建电子邮件通知

How to create Email notification for changes to files in storage bucket

如何向电子邮件地址 (john.citizen@gmail.com) 创建电子邮件通知(当存储桶中的文件发生更改时,即添加新文件、追加、覆盖或失败更新?我刚开始使用 GCP。

GCP 没有 'mail-me' 当云存储发生变化时 但是 您可以在您的应用程序中接收通知并从那里发送电子邮件。

有两种方法可以做到这一点:

  • Object Change Notifications 将向您的应用发送 HTTP POST。

  • Pub/Sub storage notifications(Google 推荐)。 - 它会在创建、修改或删除文件时发布 pub/sub 消息。 Pubsub 可以执行 HTTP Posts,触发云功能,触发云 运行(类似功能,但 dockerized),或被轮询。

Google 还有一个 Sending mails 教程。

有一个极端情况您可能会觉得有用:

如果

  • 音量非常低并且
  • 文件creaton/update/delete发生one-by-one
  • 您不介意更改/创建/更新了哪个文件
  • 丢失通知并不重要

那么你可以:

  • 设置低保留(<5 分钟)的发布订阅队列。
  • 当队列有多条消息时设置提醒。
  • 并且 Google 会在这种情况发生时向您发送电子邮件。
#Only change dataset name

def process_request(event, context):
    try:
    
        # Change the bigquery_dataset Name according to what you have created.
        bigquery_dataset_name = 'Your dataset name'

        # Don't Change Anything from here on.
        # When creating the function trigger type event = storage bucket

        source_bucket_name = event['bucket']
        blob_name = event['name']
        # call function to notify bucket updates
       #send_text_message_to_teams("{} has been received in {}".format(blob_name, source_bucket_name))
        storage_client = storage.Client()
        bigquery_client = bigquery.Client()
        source_bucket = storage_client.bucket(source_bucket_name) 
        source_blob = source_bucket.blob(blob_name)
        
        #check if file type is csv the define job_config,uri, filename, tablename and table id AND then load the job
        if source_blob.name.split('.')[-1] == 'csv':
            job_config = bigquery.LoadJobConfig(
                    skip_leading_rows=1,
                    autodetect=True,
                    source_format=bigquery.SourceFormat.CSV,
                    write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE)
            uri = 'gs://{}/{}'.format(source_bucket_name, source_blob.name)
            file_name = '.'.join(source_blob.name.split('/')[-1].split('.')[0:-1])
            table_name = ''.join([character if character.isalnum() else '_' for character in file_name])
            table_id = '{}.{}.{}'.format(bigquery_client.project, bigquery_dataset_name, table_name)
            print('Trasferring {} into {}'.format(source_blob.name, table_id))
            
        #load job using details above   
            load_job = bigquery_client.load_table_from_uri(uri, table_id, job_config=job_config)
            load_job.result()
            print("table updated")
            print('{} has been processed.'.format(source_blob.name))
            # call function to notify table updates
            #send_text_message_to_teams("{} has been updated".format(table_id))
        else:
            print('{} is not a csv.'.format(source_blob.name))
    except Exception as e:
         # call function to notify failures
         #send_text_message_to_teams("function-uploadcsv has encoutered an issue. The details are {}".format(e))