如何在 NiFi ValidateRecord 处理器/JsonRecordSetWriter 中将时间戳序列化为 Json 字段
How to serialize Timestamp into Json field in NiFi ValidateRecord processor / JsonRecordSetWriter
如何将时间戳序列化为 NiFi ValidateRecord
处理器中的 Json 字段 / JsonRecordSetWriter
.
输入时,我有一个 CSV 文件,其中时间戳列的格式为 yyyy-MM-dd HH:mm:ss.SSS
。
在我的 NiFi Flow 中,我有一个 ValidateRecord
处理器,它使用 CSVReader
进行读取,JsonRecordSetWriter
作为写入。他们都使用 Avro 模式,时间戳字段定义为
"fields" : [ {
"name" : "timestamp",
"type" : {
"type" : "long",
"logicalType" : "timestamp-millis"
},
"doc" : "Type inferred from '2016/10/08 07:51:00.000'"
}, {
...
当字段值如 2016-10-08 07:51:00.000
的记录通过时,我在 NiFi 日志中遇到异常:
2018-10-18 17:05:59,135 ERROR [Timer-Driven Process Thread-1] o.a.n.processors.standard.ValidateRecord ValidateRecord[id=3d44915d-a52a-3eb0-1ae1-7b0cbe4b1a03] Failed to write MapRecord[{timestamp=2016-10-08 07:51:00.0, ... ] with schema {"type":"record","name":"redfunnel","doc":"Schema generated by Kite","fields":[{"name":"timestamp","type":{"type":"long","logicalType":"timestamp-millis"},"doc":"Type inferred from '2016/10/08 07:51:00.000'"},{ .... }]} as a JSON Object due to java.lang.IllegalStateException: No ObjectCodec defined for the generator, can only serialize simple wrapper types (type passed java.sql.Timestamp): java.lang.IllegalStateException: No ObjectCodec defined for the generator, can only serialize simple wrapper types (type passed java.sql.Timestamp)
java.lang.IllegalStateException: No ObjectCodec defined for the generator, can only serialize simple wrapper types (type passed java.sql.Timestamp)
at org.codehaus.jackson.impl.JsonGeneratorBase._writeSimpleObject(JsonGeneratorBase.java:556)
at org.codehaus.jackson.impl.JsonGeneratorBase.writeObject(JsonGeneratorBase.java:317)
at org.apache.nifi.json.WriteJsonResult.writeRawValue(WriteJsonResult.java:267)
at org.apache.nifi.json.WriteJsonResult.writeRecord(WriteJsonResult.java:201)
at org.apache.nifi.json.WriteJsonResult.writeRawRecord(WriteJsonResult.java:149)
at org.apache.nifi.processors.standard.ValidateRecord.onTrigger(ValidateRecord.java:342)
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165)
at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203)
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent.run(TimerDrivenSchedulingAgent.java:117)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
在我的 JsonRecordSetWriter
的属性中,我尝试将时间戳记的格式指定为 yyyy-MM-dd HH:mm:ss.SSS
但不幸的是没有成功,我仍然在 NiFi 日志中遇到相同的异常。
这是否意味着 JsonRecordSetWriter
默认情况下无法序列化 java.time.Timestamp
,即使它具有 Timestamp Format
属性 来配置看似完全相同?
是否可以使用开箱即用的 NiFi 组件根据自定义格式编写时间戳,或者我必须修改 JsonRecordSetWriter
?
更新
跟进代码,this code branch 抛出了我的异常。
似乎是未通过验证的无效记录的分支。也许我的错误只发生在无效记录上。
看来我找到了适合我的情况的配置。
我不得不将模式分成两部分:一个用于输入,另一个用于输出。
因此,schema1 将时间戳字段定义为:
{
"name" : "timestamp",
"type" : "string",
"doc" : "Type inferred from '2016/10/08 07:51:00.000'"
}
和schema2将时间戳字段定义为
{
"name" : "timestamp",
"type" : {
"type" : "long",
"logicalType" : "timestamp-millis"
},
"doc" : "Type inferred from '2016/10/08 07:51:00.000'"
}
现在我正在使用
配置 ValidateRecord
处理器
- 使用 schema1 的 CSVReader
- JsonRecordSetWriter 使用 schema2
- 具有 schema1
的 ValidateRecord 的 "Schema Text" 字段
之后,记录顺利通过我的 ValidateRecord
处理器,并使用 PutDatabaseRecord
处理器进入 Postgres 数据库的 timestamp
字段,该处理器使用 JsonTreeReader
配置 schema2.
同样重要的是使用正确的字符串格式配置 JsonTreeReader
的时间戳格式 属性,例如'yyyy-MM-dd HH:mm:ss.SSS' 就我而言。
希望这对遇到类似情况的人有所帮助。
如何将时间戳序列化为 NiFi ValidateRecord
处理器中的 Json 字段 / JsonRecordSetWriter
.
输入时,我有一个 CSV 文件,其中时间戳列的格式为 yyyy-MM-dd HH:mm:ss.SSS
。
在我的 NiFi Flow 中,我有一个 ValidateRecord
处理器,它使用 CSVReader
进行读取,JsonRecordSetWriter
作为写入。他们都使用 Avro 模式,时间戳字段定义为
"fields" : [ {
"name" : "timestamp",
"type" : {
"type" : "long",
"logicalType" : "timestamp-millis"
},
"doc" : "Type inferred from '2016/10/08 07:51:00.000'"
}, {
...
当字段值如 2016-10-08 07:51:00.000
的记录通过时,我在 NiFi 日志中遇到异常:
2018-10-18 17:05:59,135 ERROR [Timer-Driven Process Thread-1] o.a.n.processors.standard.ValidateRecord ValidateRecord[id=3d44915d-a52a-3eb0-1ae1-7b0cbe4b1a03] Failed to write MapRecord[{timestamp=2016-10-08 07:51:00.0, ... ] with schema {"type":"record","name":"redfunnel","doc":"Schema generated by Kite","fields":[{"name":"timestamp","type":{"type":"long","logicalType":"timestamp-millis"},"doc":"Type inferred from '2016/10/08 07:51:00.000'"},{ .... }]} as a JSON Object due to java.lang.IllegalStateException: No ObjectCodec defined for the generator, can only serialize simple wrapper types (type passed java.sql.Timestamp): java.lang.IllegalStateException: No ObjectCodec defined for the generator, can only serialize simple wrapper types (type passed java.sql.Timestamp)
java.lang.IllegalStateException: No ObjectCodec defined for the generator, can only serialize simple wrapper types (type passed java.sql.Timestamp)
at org.codehaus.jackson.impl.JsonGeneratorBase._writeSimpleObject(JsonGeneratorBase.java:556)
at org.codehaus.jackson.impl.JsonGeneratorBase.writeObject(JsonGeneratorBase.java:317)
at org.apache.nifi.json.WriteJsonResult.writeRawValue(WriteJsonResult.java:267)
at org.apache.nifi.json.WriteJsonResult.writeRecord(WriteJsonResult.java:201)
at org.apache.nifi.json.WriteJsonResult.writeRawRecord(WriteJsonResult.java:149)
at org.apache.nifi.processors.standard.ValidateRecord.onTrigger(ValidateRecord.java:342)
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165)
at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203)
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent.run(TimerDrivenSchedulingAgent.java:117)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
在我的 JsonRecordSetWriter
的属性中,我尝试将时间戳记的格式指定为 yyyy-MM-dd HH:mm:ss.SSS
但不幸的是没有成功,我仍然在 NiFi 日志中遇到相同的异常。
这是否意味着 JsonRecordSetWriter
默认情况下无法序列化 java.time.Timestamp
,即使它具有 Timestamp Format
属性 来配置看似完全相同?
是否可以使用开箱即用的 NiFi 组件根据自定义格式编写时间戳,或者我必须修改 JsonRecordSetWriter
?
更新
跟进代码,this code branch 抛出了我的异常。 似乎是未通过验证的无效记录的分支。也许我的错误只发生在无效记录上。
看来我找到了适合我的情况的配置。
我不得不将模式分成两部分:一个用于输入,另一个用于输出。
因此,schema1 将时间戳字段定义为:
{
"name" : "timestamp",
"type" : "string",
"doc" : "Type inferred from '2016/10/08 07:51:00.000'"
}
和schema2将时间戳字段定义为
{
"name" : "timestamp",
"type" : {
"type" : "long",
"logicalType" : "timestamp-millis"
},
"doc" : "Type inferred from '2016/10/08 07:51:00.000'"
}
现在我正在使用
配置ValidateRecord
处理器
- 使用 schema1 的 CSVReader
- JsonRecordSetWriter 使用 schema2
- 具有 schema1 的 ValidateRecord 的 "Schema Text" 字段
之后,记录顺利通过我的 ValidateRecord
处理器,并使用 PutDatabaseRecord
处理器进入 Postgres 数据库的 timestamp
字段,该处理器使用 JsonTreeReader
配置 schema2.
同样重要的是使用正确的字符串格式配置 JsonTreeReader
的时间戳格式 属性,例如'yyyy-MM-dd HH:mm:ss.SSS' 就我而言。
希望这对遇到类似情况的人有所帮助。