在 Kafka Connect 中将 long 转换为 int64
Convert long to int64 in Kafka Connect
我正在尝试在 Kafka Connect 中编写自定义单个消息转换。从 int64/Date 类型的字段中,我生成了一个 long 值,但是当我尝试将那个 long 写回具有相同模式(int64 类型)的 updatedValue 时,我遇到了一个问题:
for (Field field : value.schema().fields()) {
final Object origFieldValue = value.get(field);
if (timeField.equals(field.name())){
long date = convertDate(origFieldValue);
updatedValue.put(field, date);
}
updatedValue.put(field, origFieldValue);
}
错误是:
[2020-05-14 14:31:52,120] INFO WorkerSourceTask{id=datechanger} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
connect_1 | [2020-05-14 14:31:52,120] INFO WorkerSourceTask{id=datechanger-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
connect_1 | [2020-05-14 14:31:52,120] ERROR WorkerSourceTask{id=datechanger-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
connect_1 | org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
connect_1 | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
connect_1 | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
connect_1 | at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
connect_1 | at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:320)
connect_1 | at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:245)
connect_1 | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
connect_1 | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
connect_1 | at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
connect_1 | at java.util.concurrent.FutureTask.run(FutureTask.java:266)
connect_1 | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
connect_1 | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
connect_1 | at java.lang.Thread.run(Thread.java:748)
connect_1 | Caused by: org.apache.kafka.connect.errors.DataException: Invalid Java object for schema type INT64: class java.lang.Long for field: "date"
是否有机会将该 long 值转换回 int64 以使其适合模式
我遇到了同样的问题。我的问题是我从 kafka.connect.data 包中指定 Date.SCHEMA
作为模式。
有了它,你不需要传递 Long 值,你可以只传递 java.util.Date 本身。模式验证器会解决这个问题。
我通过在此处挖掘资源发现了这一点 - https://github.com/apache/kafka/blob/2.6/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java#L69
https://github.com/apache/kafka/blob/2.6/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java#L271
这里的问题是它在 SCHEMA_TYPE_CLASSES
之前找到了 LOGICAL_TYPE_CLASSES
列表,因此试图将其视为日期。那里的错误信息只是误导。
我正在尝试在 Kafka Connect 中编写自定义单个消息转换。从 int64/Date 类型的字段中,我生成了一个 long 值,但是当我尝试将那个 long 写回具有相同模式(int64 类型)的 updatedValue 时,我遇到了一个问题:
for (Field field : value.schema().fields()) {
final Object origFieldValue = value.get(field);
if (timeField.equals(field.name())){
long date = convertDate(origFieldValue);
updatedValue.put(field, date);
}
updatedValue.put(field, origFieldValue);
}
错误是:
[2020-05-14 14:31:52,120] INFO WorkerSourceTask{id=datechanger} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
connect_1 | [2020-05-14 14:31:52,120] INFO WorkerSourceTask{id=datechanger-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
connect_1 | [2020-05-14 14:31:52,120] ERROR WorkerSourceTask{id=datechanger-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
connect_1 | org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
connect_1 | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
connect_1 | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
connect_1 | at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
connect_1 | at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:320)
connect_1 | at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:245)
connect_1 | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
connect_1 | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
connect_1 | at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
connect_1 | at java.util.concurrent.FutureTask.run(FutureTask.java:266)
connect_1 | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
connect_1 | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
connect_1 | at java.lang.Thread.run(Thread.java:748)
connect_1 | Caused by: org.apache.kafka.connect.errors.DataException: Invalid Java object for schema type INT64: class java.lang.Long for field: "date"
是否有机会将该 long 值转换回 int64 以使其适合模式
我遇到了同样的问题。我的问题是我从 kafka.connect.data 包中指定 Date.SCHEMA
作为模式。
有了它,你不需要传递 Long 值,你可以只传递 java.util.Date 本身。模式验证器会解决这个问题。
我通过在此处挖掘资源发现了这一点 - https://github.com/apache/kafka/blob/2.6/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java#L69 https://github.com/apache/kafka/blob/2.6/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java#L271
这里的问题是它在 SCHEMA_TYPE_CLASSES
之前找到了 LOGICAL_TYPE_CLASSES
列表,因此试图将其视为日期。那里的错误信息只是误导。