时间戳不兼容(NiFi 的 PutSQL)
Timestamp incompatibility (NiFi's PutSQL)
我在使用 Nifi PutSQL 处理器将时间戳插入 PostgreSQL 数据库时遇到问题。
更具体地说,当尝试将格式为“2018-01-31T19:01:09+00:00”的日期插入 timestamptz 列时,我收到以下错误消息:
2018-04-01 19:29:40,091 ERROR [Timer-Driven Process Thread-5] o.apache.nifi.processors.standard.PutSQL PutSQL[id=7997503a-0162-1000-ee81-a0361cad5e0c] Failed to update database for StandardFlowFileRecord[uuid=d02e8b39-e564-4c37-a08a-dab8931e9890,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1522615075930-15, container=default, section=15], offset=11492, length=163],offset=0,name=32836401373126,size=163] due to java.sql.SQLDataException: The value of the sql.args.5.value is '2018-01-31T20:19:35+00:00', which cannot be converted to a timestamp; routing to failure: java.sql.SQLDataException: The value of the sql.args.5.value is '2018-01-31T20:19:35+00:00', which cannot be converted to a timestamp
java.sql.SQLDataException: The value of the sql.args.5.value is '2018-01-31T20:19:35+00:00', which cannot be converted to a timestamp
at org.apache.nifi.processors.standard.PutSQL.setParameters(PutSQL.java:711)
at org.apache.nifi.processors.standard.PutSQL.lambda$null(PutSQL.java:313)
at org.apache.nifi.processor.util.pattern.ExceptionHandler.execute(ExceptionHandler.java:127)
at org.apache.nifi.processors.standard.PutSQL.lambda$new(PutSQL.java:311)
at org.apache.nifi.processors.standard.PutSQL.lambda$new(PutSQL.java:354)
at org.apache.nifi.processor.util.pattern.PutGroup.putFlowFiles(PutGroup.java:91)
at org.apache.nifi.processor.util.pattern.Put.onTrigger(Put.java:101)
at org.apache.nifi.processors.standard.PutSQL.lambda$onTrigger(PutSQL.java:574)
at org.apache.nifi.processor.util.pattern.PartialFunctions.onTrigger(PartialFunctions.java:114)
at org.apache.nifi.processor.util.pattern.RollbackOnFailure.onTrigger(RollbackOnFailure.java:184)
at org.apache.nifi.processors.standard.PutSQL.onTrigger(PutSQL.java:574)
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1122)
at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent.run(TimerDrivenSchedulingAgent.java:128)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.text.ParseException: Unparseable date: "2018-01-31T20:19:35+00:00"
at java.text.DateFormat.parse(DateFormat.java:366)
at org.apache.nifi.processors.standard.PutSQL.setParameter(PutSQL.java:911)
at org.apache.nifi.processors.standard.PutSQL.setParameters(PutSQL.java:707)
... 21 common frames omitted
我已经测试了从命令行将“2018-01-31T19:01:09+00:00”插入到 timestamptz 列中,它运行良好。我尝试了各种替代格式,例如:
- '2018-01-31 19:01:09+00:00'
- '2018-01-31 19:01:09+00'
- '2018-01-31T19:01:09+00'
- '2018-01-31 19:01:09'
它们在 Nifi 中都失败并出现相同的错误,即使在从命令行执行 INSERT 时它们都插入得很好。
请附上我的流程截图。如果您需要更多详细信息,请告诉我。
老实说,我宁愿避免一起进行 java 转换,因为将日期时间保留为字符串并将其直接插入到 Postgres 数据库中就可以了。我曾尝试使用 UpdateAttribute 处理器强制执行此操作,但这会导致其他错误。
我遇到过关于这个主题的各种问题,但我仍然不明白发生了什么。最值得注意的是:
- Nifi PutSQL Timestamp/Datetime error cannot be converted error
- '0000-00-00 00:00:00' can not be represented as java.sql.Timestamp error
您可以在 PutSQL 之前使用 UpdateAttribute 以及 toDate() and format() 表达式语言函数,将您的时间戳值转换为数据库可以接受的内容。
或者,您可以使用 PutDatabaseRecord 跳过 SplitText、ConvertJSONToSQL 和 PutSQL 步骤,您可以配置一个 RecordReader 来接受您的时间戳格式并进行相应的转换。如果可行,这是一个更好的解决方案,因为它将一次处理整个流文件(而不是单独的行)
我通过使用调用 python 脚本的 ExecuteStreamCommand 处理器解决了这个问题,该脚本将 JSON 行转换为相应的 SQL 插入语句。本例中感兴趣的 table 是 reddit_post
。
python 脚本的代码(我知道不需要 INSERT
参数,但这是因为我计划稍后添加 UPDATE
选项上):
import json
import argparse
import sys
# For command line arguments
parser = argparse.ArgumentParser(description='Converts JSON to respective SQL statement')
parser.add_argument('statement_type', type=str, nargs=1)
parser.add_argument('table_name', type=str, nargs=1)
# Reading the command line arguments
statement_type = parser.parse_args().statement_type[0]
table_name = parser.parse_args().table_name[0]
# Initialize SQL statement
statement = ''
for line in sys.stdin:
# Load JSON line
json_line = json.loads(line)
# Add table name and SQL syntax
if statement_type == 'INSERT':
statement += 'INSERT INTO {} '.format(table_name)
# Add table parameters and SQL syntax
statement += '({}) '.format(', '.join(json_line.keys()))
# Add table values and SQL syntax
# Note that strings are formatted with single quotes, other data types are converted to strings (for the join method)
statement += "VALUES ({});".format(', '.join("'{0}'".format(value.replace("'", "''")) if type(value) == str else str(value) for value in json_line.values()))
# Send statement to stdout
print(statement)
ExecuteStreamCommand的配置(注意Argument Delimeter设置为单个space):
流程片段:
我希望这可以帮助遇到类似问题的人。如果您对如何改进脚本、流程或其他任何内容有任何建议,请随时告诉我!
只需添加 3 位毫秒数就可以解决我使用 ExecuteSQL 处理器的问题 - "yyyy-mm-dd hh:mm:ss.sss"
我在使用 Nifi PutSQL 处理器将时间戳插入 PostgreSQL 数据库时遇到问题。
更具体地说,当尝试将格式为“2018-01-31T19:01:09+00:00”的日期插入 timestamptz 列时,我收到以下错误消息:
2018-04-01 19:29:40,091 ERROR [Timer-Driven Process Thread-5] o.apache.nifi.processors.standard.PutSQL PutSQL[id=7997503a-0162-1000-ee81-a0361cad5e0c] Failed to update database for StandardFlowFileRecord[uuid=d02e8b39-e564-4c37-a08a-dab8931e9890,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1522615075930-15, container=default, section=15], offset=11492, length=163],offset=0,name=32836401373126,size=163] due to java.sql.SQLDataException: The value of the sql.args.5.value is '2018-01-31T20:19:35+00:00', which cannot be converted to a timestamp; routing to failure: java.sql.SQLDataException: The value of the sql.args.5.value is '2018-01-31T20:19:35+00:00', which cannot be converted to a timestamp
java.sql.SQLDataException: The value of the sql.args.5.value is '2018-01-31T20:19:35+00:00', which cannot be converted to a timestamp
at org.apache.nifi.processors.standard.PutSQL.setParameters(PutSQL.java:711)
at org.apache.nifi.processors.standard.PutSQL.lambda$null(PutSQL.java:313)
at org.apache.nifi.processor.util.pattern.ExceptionHandler.execute(ExceptionHandler.java:127)
at org.apache.nifi.processors.standard.PutSQL.lambda$new(PutSQL.java:311)
at org.apache.nifi.processors.standard.PutSQL.lambda$new(PutSQL.java:354)
at org.apache.nifi.processor.util.pattern.PutGroup.putFlowFiles(PutGroup.java:91)
at org.apache.nifi.processor.util.pattern.Put.onTrigger(Put.java:101)
at org.apache.nifi.processors.standard.PutSQL.lambda$onTrigger(PutSQL.java:574)
at org.apache.nifi.processor.util.pattern.PartialFunctions.onTrigger(PartialFunctions.java:114)
at org.apache.nifi.processor.util.pattern.RollbackOnFailure.onTrigger(RollbackOnFailure.java:184)
at org.apache.nifi.processors.standard.PutSQL.onTrigger(PutSQL.java:574)
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1122)
at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent.run(TimerDrivenSchedulingAgent.java:128)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.text.ParseException: Unparseable date: "2018-01-31T20:19:35+00:00"
at java.text.DateFormat.parse(DateFormat.java:366)
at org.apache.nifi.processors.standard.PutSQL.setParameter(PutSQL.java:911)
at org.apache.nifi.processors.standard.PutSQL.setParameters(PutSQL.java:707)
... 21 common frames omitted
我已经测试了从命令行将“2018-01-31T19:01:09+00:00”插入到 timestamptz 列中,它运行良好。我尝试了各种替代格式,例如:
- '2018-01-31 19:01:09+00:00'
- '2018-01-31 19:01:09+00'
- '2018-01-31T19:01:09+00'
- '2018-01-31 19:01:09'
它们在 Nifi 中都失败并出现相同的错误,即使在从命令行执行 INSERT 时它们都插入得很好。
请附上我的流程截图。如果您需要更多详细信息,请告诉我。
老实说,我宁愿避免一起进行 java 转换,因为将日期时间保留为字符串并将其直接插入到 Postgres 数据库中就可以了。我曾尝试使用 UpdateAttribute 处理器强制执行此操作,但这会导致其他错误。
我遇到过关于这个主题的各种问题,但我仍然不明白发生了什么。最值得注意的是:
- Nifi PutSQL Timestamp/Datetime error cannot be converted error
- '0000-00-00 00:00:00' can not be represented as java.sql.Timestamp error
您可以在 PutSQL 之前使用 UpdateAttribute 以及 toDate() and format() 表达式语言函数,将您的时间戳值转换为数据库可以接受的内容。
或者,您可以使用 PutDatabaseRecord 跳过 SplitText、ConvertJSONToSQL 和 PutSQL 步骤,您可以配置一个 RecordReader 来接受您的时间戳格式并进行相应的转换。如果可行,这是一个更好的解决方案,因为它将一次处理整个流文件(而不是单独的行)
我通过使用调用 python 脚本的 ExecuteStreamCommand 处理器解决了这个问题,该脚本将 JSON 行转换为相应的 SQL 插入语句。本例中感兴趣的 table 是 reddit_post
。
python 脚本的代码(我知道不需要 INSERT
参数,但这是因为我计划稍后添加 UPDATE
选项上):
import json
import argparse
import sys
# For command line arguments
parser = argparse.ArgumentParser(description='Converts JSON to respective SQL statement')
parser.add_argument('statement_type', type=str, nargs=1)
parser.add_argument('table_name', type=str, nargs=1)
# Reading the command line arguments
statement_type = parser.parse_args().statement_type[0]
table_name = parser.parse_args().table_name[0]
# Initialize SQL statement
statement = ''
for line in sys.stdin:
# Load JSON line
json_line = json.loads(line)
# Add table name and SQL syntax
if statement_type == 'INSERT':
statement += 'INSERT INTO {} '.format(table_name)
# Add table parameters and SQL syntax
statement += '({}) '.format(', '.join(json_line.keys()))
# Add table values and SQL syntax
# Note that strings are formatted with single quotes, other data types are converted to strings (for the join method)
statement += "VALUES ({});".format(', '.join("'{0}'".format(value.replace("'", "''")) if type(value) == str else str(value) for value in json_line.values()))
# Send statement to stdout
print(statement)
ExecuteStreamCommand的配置(注意Argument Delimeter设置为单个space):
流程片段:
我希望这可以帮助遇到类似问题的人。如果您对如何改进脚本、流程或其他任何内容有任何建议,请随时告诉我!
只需添加 3 位毫秒数就可以解决我使用 ExecuteSQL 处理器的问题 - "yyyy-mm-dd hh:mm:ss.sss"