使用 kafka-connect-spooldir 连接器在 Kafka Connect 中解析 dd.MM.yyyy 格式的日期
Parsing dates in format dd.MM.yyyy in Kafka Connect using kafka-connect-spooldir connector
我正在尝试使用来自 https://github.com/jcustenborder/kafka-connect-spooldir
的 SpoolDirCsvSourceConnector
我在 Kafka 中对连接器进行了以下配置:
connector.class=com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector
csv.first.row.as.header=true
finished.path=/csv/finished
tasks.max=1
parser.timestamp.date.formats=[dd.MM.yyyy, yyyy-MM-dd'T'HH:mm:ss, yyyy-MM-dd' 'HH:mm:ss]
key.schema={"name":"com.github.jcustenborder.kafka.connect.model.Key","type":"STRUCT","isOptional":false,"fieldSchemas":{}}
csv.separator.char=59
input.file.pattern=umsaetze_.*.csv
topic=test-csv
error.path=/csv/error
input.path=/csv/input
value.schema={"name":"com.github.jcustenborder.kafka.connect.model.Value","type":"STRUCT","isOptional":false,"fieldSchemas":{"Buchungstag":{"name":"org.apache.kafka.connect.data.Timestamp","type":"INT64","version":1,"isOptional":true},"Wertstellung":{"name":"org.apache.kafka.connect.data.Timestamp","type":"INT64","version":1,"isOptional":true},"Vorgang":{"type":"STRING","isOptional":false},"Buchungstext":{"type":"STRING","isOptional":false},"Umsatz":{"name":"org.apache.kafka.connect.data.Decimal","type":"BYTES","version":1,"parameters":{"scale":"2"},"isOptional":true}}}
值架构如下:
{
"name": "com.github.jcustenborder.kafka.connect.model.Value",
"type": "STRUCT",
"isOptional": false,
"fieldSchemas": {
"Buchungstag": {
"name": "org.apache.kafka.connect.data.Date",
"type": "INT32",
"version": 1,
"isOptional": true
},
"Wertstellung": {
"name": "org.apache.kafka.connect.data.Timestamp",
"type": "INT64",
"version": 1,
"isOptional": true
},
"Vorgang": {
"type": "STRING",
"isOptional": false
},
"Buchungstext": {
"type": "STRING",
"isOptional": false
},
"Umsatz": {
"name": "org.apache.kafka.connect.data.Decimal",
"type": "BYTES",
"version": 1,
"parameters": {
"scale": "2"
},
"isOptional": true
}
}
}
我试过日期而不是时间戳
{
"name" : "org.apache.kafka.connect.data.Date",
"type" : "INT32",
"version" : 1,
"isOptional" : true
}
时间戳和日期对我来说都不起作用,但与 Buchungstag 和 Wertstellung 字段示例中的例外情况相同。我试图用选项 parser.timestamp.date.formats
解决它,但它没有帮助。
这是我尝试导入 Kafka 的 CSV 示例:
Buchungstag;Wertstellung;Vorgang;Buchungstext;Umsatz;
08.02.2019;08.02.2019;Lastschrift / Belastung;Auftraggeber: BlablaBuchungstext: Fahrschein XXXXXX Ref. U3436346/8423;-55,60;
08.02.2019;08.02.2019;Lastschrift / Belastung;Auftraggeber: Bank AGBuchungstext: 01.02.209:189,34 Ref. ZMKDVSDVS/5620;-189,34;
我在 Kafka Connect 中遇到以下异常:
org.apache.kafka.connect.errors.ConnectException: org.apache.kafka.connect.errors.DataException: Exception thrown while parsing data for 'Buchungstag'. linenumber=2
at com.github.jcustenborder.kafka.connect.spooldir.AbstractSourceTask.read(AbstractSourceTask.java:277)
at com.github.jcustenborder.kafka.connect.spooldir.AbstractSourceTask.poll(AbstractSourceTask.java:144)
... 10 more
Caused by: org.apache.kafka.connect.errors.DataException: Could not parse '08.02.2019' to 'Date'
at com.github.jcustenborder.kafka.connect.utils.data.Parser.parseString(Parser.java:113)
... 11 more
Caused by: java.lang.IllegalStateException: Could not parse '08.02.2019' to java.util.Date
at com.google.common.base.Preconditions.checkState(Preconditions.java:588)
... 12 more
你知道应该有什么值模式来解析像 01.01.2001 这样的日期吗?
我认为问题出在您的 parser.timestamp.date.formats
值上。你通过了[dd.MM.yyyy, yyyy-MM-dd'T'HH:mm:ss, yyyy-MM-dd' 'HH:mm:ss]
.
在配置中 属性 (parser.timestamp.date.formats
) 被设置为 List
类型。列表应作为带逗号分隔符 (,
) 的字符串传递。
在您的情况下,它应该是:dd.MM.yyyy,yyyy-MM-dd'T'HH:mm:ss,yyyy-MM-dd' 'HH:mm:ss
。问题可能出在空格上,因为它们被修剪了。
我正在尝试使用来自 https://github.com/jcustenborder/kafka-connect-spooldir
的 SpoolDirCsvSourceConnector我在 Kafka 中对连接器进行了以下配置:
connector.class=com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector
csv.first.row.as.header=true
finished.path=/csv/finished
tasks.max=1
parser.timestamp.date.formats=[dd.MM.yyyy, yyyy-MM-dd'T'HH:mm:ss, yyyy-MM-dd' 'HH:mm:ss]
key.schema={"name":"com.github.jcustenborder.kafka.connect.model.Key","type":"STRUCT","isOptional":false,"fieldSchemas":{}}
csv.separator.char=59
input.file.pattern=umsaetze_.*.csv
topic=test-csv
error.path=/csv/error
input.path=/csv/input
value.schema={"name":"com.github.jcustenborder.kafka.connect.model.Value","type":"STRUCT","isOptional":false,"fieldSchemas":{"Buchungstag":{"name":"org.apache.kafka.connect.data.Timestamp","type":"INT64","version":1,"isOptional":true},"Wertstellung":{"name":"org.apache.kafka.connect.data.Timestamp","type":"INT64","version":1,"isOptional":true},"Vorgang":{"type":"STRING","isOptional":false},"Buchungstext":{"type":"STRING","isOptional":false},"Umsatz":{"name":"org.apache.kafka.connect.data.Decimal","type":"BYTES","version":1,"parameters":{"scale":"2"},"isOptional":true}}}
值架构如下:
{
"name": "com.github.jcustenborder.kafka.connect.model.Value",
"type": "STRUCT",
"isOptional": false,
"fieldSchemas": {
"Buchungstag": {
"name": "org.apache.kafka.connect.data.Date",
"type": "INT32",
"version": 1,
"isOptional": true
},
"Wertstellung": {
"name": "org.apache.kafka.connect.data.Timestamp",
"type": "INT64",
"version": 1,
"isOptional": true
},
"Vorgang": {
"type": "STRING",
"isOptional": false
},
"Buchungstext": {
"type": "STRING",
"isOptional": false
},
"Umsatz": {
"name": "org.apache.kafka.connect.data.Decimal",
"type": "BYTES",
"version": 1,
"parameters": {
"scale": "2"
},
"isOptional": true
}
}
}
我试过日期而不是时间戳
{
"name" : "org.apache.kafka.connect.data.Date",
"type" : "INT32",
"version" : 1,
"isOptional" : true
}
时间戳和日期对我来说都不起作用,但与 Buchungstag 和 Wertstellung 字段示例中的例外情况相同。我试图用选项 parser.timestamp.date.formats
解决它,但它没有帮助。
这是我尝试导入 Kafka 的 CSV 示例:
Buchungstag;Wertstellung;Vorgang;Buchungstext;Umsatz;
08.02.2019;08.02.2019;Lastschrift / Belastung;Auftraggeber: BlablaBuchungstext: Fahrschein XXXXXX Ref. U3436346/8423;-55,60;
08.02.2019;08.02.2019;Lastschrift / Belastung;Auftraggeber: Bank AGBuchungstext: 01.02.209:189,34 Ref. ZMKDVSDVS/5620;-189,34;
我在 Kafka Connect 中遇到以下异常:
org.apache.kafka.connect.errors.ConnectException: org.apache.kafka.connect.errors.DataException: Exception thrown while parsing data for 'Buchungstag'. linenumber=2
at com.github.jcustenborder.kafka.connect.spooldir.AbstractSourceTask.read(AbstractSourceTask.java:277)
at com.github.jcustenborder.kafka.connect.spooldir.AbstractSourceTask.poll(AbstractSourceTask.java:144)
... 10 more
Caused by: org.apache.kafka.connect.errors.DataException: Could not parse '08.02.2019' to 'Date'
at com.github.jcustenborder.kafka.connect.utils.data.Parser.parseString(Parser.java:113)
... 11 more
Caused by: java.lang.IllegalStateException: Could not parse '08.02.2019' to java.util.Date
at com.google.common.base.Preconditions.checkState(Preconditions.java:588)
... 12 more
你知道应该有什么值模式来解析像 01.01.2001 这样的日期吗?
我认为问题出在您的 parser.timestamp.date.formats
值上。你通过了[dd.MM.yyyy, yyyy-MM-dd'T'HH:mm:ss, yyyy-MM-dd' 'HH:mm:ss]
.
在配置中 属性 (parser.timestamp.date.formats
) 被设置为 List
类型。列表应作为带逗号分隔符 (,
) 的字符串传递。
在您的情况下,它应该是:dd.MM.yyyy,yyyy-MM-dd'T'HH:mm:ss,yyyy-MM-dd' 'HH:mm:ss
。问题可能出在空格上,因为它们被修剪了。