Python Apache Beam:日期值超出范围
Python Apache Beam: date value out of range
应用 this or this 示例来构建我的程序,每次尝试插入到 Big Query 时,我都会遇到此错误:
OverflowError: 日期值超出范围 [while 运行 'Format']
我的 Beam 管道是这样的:
Bigquery = (transformation
| 'Format' >> beam.ParDo(FormatBigQueryoFn())
| 'Write to BigQuery' >> beam.io.Write(beam.io.BigQuerySink(
'XXXX',
schema=TABLE_SCHEMA,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
)))
在classFormatBigQueryoFn中应该是window数据时间
的逻辑
例1的代码:
def timestamp2str(t, fmt='%Y-%m-%d %H:%M:%S.000'):
"""Converts a unix timestamp into a formatted string."""
return datetime.fromtimestamp(t).strftime(fmt)
class TeamScoresDict(beam.DoFn):
"""Formats the data into a dictionary of BigQuery columns with their values
Receives a (team, score) pair, extracts the window start timestamp, and
formats everything together into a dictionary. The dictionary is in the format
{'bigquery_column': value}
"""
def process(self, team_score, window=beam.DoFn.WindowParam):
team, score = team_score
start = timestamp2str(int(window.start))
yield {
'team': team,
'total_score': score,
'window_start': start,
'processing_time': timestamp2str(int(time.time()))
}
例2的代码:
class FormatDoFn(beam.DoFn):
def process(self, element, window=beam.DoFn.WindowParam):
ts_format = '%Y-%m-%d %H:%M:%S.%f UTC'
window_start = window.start.to_utc_datetime().strftime(ts_format)
window_end = window.end.to_utc_datetime().strftime(ts_format)
return [{'word': element[0],
'count': element[1],
'window_start':window_start,
'window_end':window_end}]
我的管道可能出了什么问题?
编辑:
例如,如果我打印 window.start,我会得到:
Timestamp(-9223372036860)
问题是我之前从文件中读取数据以使用 Google Pub/Sub.
对其进行测试
当我从文件中读取数据时,元素没有时间戳。
元素中必须有时间戳。
Pub/Sub 自动附加此时间戳。
最简单的 windowing 形式是使用固定时间 windows:给定一个可能不断更新的带时间戳的 PCollection,每个 window 可能捕获(对于示例)时间戳落在五分钟间隔内的所有元素。
应用 this or this 示例来构建我的程序,每次尝试插入到 Big Query 时,我都会遇到此错误:
OverflowError: 日期值超出范围 [while 运行 'Format']
我的 Beam 管道是这样的:
Bigquery = (transformation
| 'Format' >> beam.ParDo(FormatBigQueryoFn())
| 'Write to BigQuery' >> beam.io.Write(beam.io.BigQuerySink(
'XXXX',
schema=TABLE_SCHEMA,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
)))
在classFormatBigQueryoFn中应该是window数据时间
的逻辑例1的代码:
def timestamp2str(t, fmt='%Y-%m-%d %H:%M:%S.000'):
"""Converts a unix timestamp into a formatted string."""
return datetime.fromtimestamp(t).strftime(fmt)
class TeamScoresDict(beam.DoFn):
"""Formats the data into a dictionary of BigQuery columns with their values
Receives a (team, score) pair, extracts the window start timestamp, and
formats everything together into a dictionary. The dictionary is in the format
{'bigquery_column': value}
"""
def process(self, team_score, window=beam.DoFn.WindowParam):
team, score = team_score
start = timestamp2str(int(window.start))
yield {
'team': team,
'total_score': score,
'window_start': start,
'processing_time': timestamp2str(int(time.time()))
}
例2的代码:
class FormatDoFn(beam.DoFn):
def process(self, element, window=beam.DoFn.WindowParam):
ts_format = '%Y-%m-%d %H:%M:%S.%f UTC'
window_start = window.start.to_utc_datetime().strftime(ts_format)
window_end = window.end.to_utc_datetime().strftime(ts_format)
return [{'word': element[0],
'count': element[1],
'window_start':window_start,
'window_end':window_end}]
我的管道可能出了什么问题?
编辑:
例如,如果我打印 window.start,我会得到:
Timestamp(-9223372036860)
问题是我之前从文件中读取数据以使用 Google Pub/Sub.
对其进行测试当我从文件中读取数据时,元素没有时间戳。
元素中必须有时间戳。
Pub/Sub 自动附加此时间戳。
最简单的 windowing 形式是使用固定时间 windows:给定一个可能不断更新的带时间戳的 PCollection,每个 window 可能捕获(对于示例)时间戳落在五分钟间隔内的所有元素。