如何为存储桶中文件的更改创建电子邮件通知
How to create Email notification for changes to files in storage bucket
如何向电子邮件地址 (john.citizen@gmail.com) 创建电子邮件通知(当存储桶中的文件发生更改时,即添加新文件、追加、覆盖或失败更新?我刚开始使用 GCP。
GCP 没有 'mail-me' 当云存储发生变化时 但是 您可以在您的应用程序中接收通知并从那里发送电子邮件。
有两种方法可以做到这一点:
Object Change Notifications 将向您的应用发送 HTTP POST。
Pub/Sub storage notifications(Google 推荐)。 - 它会在创建、修改或删除文件时发布 pub/sub 消息。 Pubsub 可以执行 HTTP Posts,触发云功能,触发云 运行(类似功能,但 dockerized),或被轮询。
Google 还有一个 Sending mails 教程。
有一个极端情况您可能会觉得有用:
如果
- 音量非常低并且
- 文件creaton/update/delete发生one-by-one和
- 您不介意更改/创建/更新了哪个文件和
- 丢失通知并不重要
那么你可以:
- 设置低保留(<5 分钟)的发布订阅队列。
- 当队列有多条消息时设置提醒。
- 并且 Google 会在这种情况发生时向您发送电子邮件。
#Only change dataset name
def process_request(event, context):
try:
# Change the bigquery_dataset Name according to what you have created.
bigquery_dataset_name = 'Your dataset name'
# Don't Change Anything from here on.
# When creating the function trigger type event = storage bucket
source_bucket_name = event['bucket']
blob_name = event['name']
# call function to notify bucket updates
#send_text_message_to_teams("{} has been received in {}".format(blob_name, source_bucket_name))
storage_client = storage.Client()
bigquery_client = bigquery.Client()
source_bucket = storage_client.bucket(source_bucket_name)
source_blob = source_bucket.blob(blob_name)
#check if file type is csv the define job_config,uri, filename, tablename and table id AND then load the job
if source_blob.name.split('.')[-1] == 'csv':
job_config = bigquery.LoadJobConfig(
skip_leading_rows=1,
autodetect=True,
source_format=bigquery.SourceFormat.CSV,
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE)
uri = 'gs://{}/{}'.format(source_bucket_name, source_blob.name)
file_name = '.'.join(source_blob.name.split('/')[-1].split('.')[0:-1])
table_name = ''.join([character if character.isalnum() else '_' for character in file_name])
table_id = '{}.{}.{}'.format(bigquery_client.project, bigquery_dataset_name, table_name)
print('Trasferring {} into {}'.format(source_blob.name, table_id))
#load job using details above
load_job = bigquery_client.load_table_from_uri(uri, table_id, job_config=job_config)
load_job.result()
print("table updated")
print('{} has been processed.'.format(source_blob.name))
# call function to notify table updates
#send_text_message_to_teams("{} has been updated".format(table_id))
else:
print('{} is not a csv.'.format(source_blob.name))
except Exception as e:
# call function to notify failures
#send_text_message_to_teams("function-uploadcsv has encoutered an issue. The details are {}".format(e))
如何向电子邮件地址 (john.citizen@gmail.com) 创建电子邮件通知(当存储桶中的文件发生更改时,即添加新文件、追加、覆盖或失败更新?我刚开始使用 GCP。
GCP 没有 'mail-me' 当云存储发生变化时 但是 您可以在您的应用程序中接收通知并从那里发送电子邮件。
有两种方法可以做到这一点:
Object Change Notifications 将向您的应用发送 HTTP POST。
Pub/Sub storage notifications(Google 推荐)。 - 它会在创建、修改或删除文件时发布 pub/sub 消息。 Pubsub 可以执行 HTTP Posts,触发云功能,触发云 运行(类似功能,但 dockerized),或被轮询。
Google 还有一个 Sending mails 教程。
有一个极端情况您可能会觉得有用:
如果
- 音量非常低并且
- 文件creaton/update/delete发生one-by-one和
- 您不介意更改/创建/更新了哪个文件和
- 丢失通知并不重要
那么你可以:
- 设置低保留(<5 分钟)的发布订阅队列。
- 当队列有多条消息时设置提醒。
- 并且 Google 会在这种情况发生时向您发送电子邮件。
#Only change dataset name
def process_request(event, context):
try:
# Change the bigquery_dataset Name according to what you have created.
bigquery_dataset_name = 'Your dataset name'
# Don't Change Anything from here on.
# When creating the function trigger type event = storage bucket
source_bucket_name = event['bucket']
blob_name = event['name']
# call function to notify bucket updates
#send_text_message_to_teams("{} has been received in {}".format(blob_name, source_bucket_name))
storage_client = storage.Client()
bigquery_client = bigquery.Client()
source_bucket = storage_client.bucket(source_bucket_name)
source_blob = source_bucket.blob(blob_name)
#check if file type is csv the define job_config,uri, filename, tablename and table id AND then load the job
if source_blob.name.split('.')[-1] == 'csv':
job_config = bigquery.LoadJobConfig(
skip_leading_rows=1,
autodetect=True,
source_format=bigquery.SourceFormat.CSV,
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE)
uri = 'gs://{}/{}'.format(source_bucket_name, source_blob.name)
file_name = '.'.join(source_blob.name.split('/')[-1].split('.')[0:-1])
table_name = ''.join([character if character.isalnum() else '_' for character in file_name])
table_id = '{}.{}.{}'.format(bigquery_client.project, bigquery_dataset_name, table_name)
print('Trasferring {} into {}'.format(source_blob.name, table_id))
#load job using details above
load_job = bigquery_client.load_table_from_uri(uri, table_id, job_config=job_config)
load_job.result()
print("table updated")
print('{} has been processed.'.format(source_blob.name))
# call function to notify table updates
#send_text_message_to_teams("{} has been updated".format(table_id))
else:
print('{} is not a csv.'.format(source_blob.name))
except Exception as e:
# call function to notify failures
#send_text_message_to_teams("function-uploadcsv has encoutered an issue. The details are {}".format(e))