Bigquery kafka 接收器连接不接受布尔值
Bigquery kafka sink connect not accepting boolean values
Schemaless Bigquery kafka sink connector SMT 无法将数据保存到 bigquery on boolean。
MapsUtil.debugPrint 在从 apply(R record)
.
返回之前的 recordValue
active = true java.lang.String
架构定义
{
"mode": "NULLABLE",
"name": "active",
"type": "BOOLEAN"
}
解串器
public class BooleanDeserialiser extends JsonDeserializer<Boolean> {
@Override
public Boolean deserialize(JsonParser parser, DeserializationContext context)
throws IOException {
return !"0".equals(parser.getText());
}
序列化程序
public class BooleanSerialiser extends JsonSerializer<Boolean> {
@Override
public void serialize(Boolean value, JsonGenerator gen, SerializerProvider serializers)
throws IOException {
gen.writeString(value ? "true" : "false");
}
错误
[row index 76]: invalid: Cannot convert string value to boolean: 1
at com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor.maybeThrowEncounteredErrors(KCBQThreadPoolExecutor.java:108)
at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.put(BigQuerySinkTask.java:233)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
我将此保留为社区 Wiki 回复,以提高社区对此类问题的可见度。正如您对 connector devs, your request may need a feature implemented on bigquery that needs to be requested to by opening a BigQuery Feature Request 提出的问题所述。这将使此类请求成为开发人员关注的焦点,以便予以考虑。
值得一提的是,你可以在项目issue dashboard.
上看到confluentinc kafka-connect-bigquery的issues/bugs列表
Schemaless Bigquery kafka sink connector SMT 无法将数据保存到 bigquery on boolean。
MapsUtil.debugPrint 在从 apply(R record)
.
active = true java.lang.String
架构定义
{
"mode": "NULLABLE",
"name": "active",
"type": "BOOLEAN"
}
解串器
public class BooleanDeserialiser extends JsonDeserializer<Boolean> {
@Override
public Boolean deserialize(JsonParser parser, DeserializationContext context)
throws IOException {
return !"0".equals(parser.getText());
}
序列化程序
public class BooleanSerialiser extends JsonSerializer<Boolean> {
@Override
public void serialize(Boolean value, JsonGenerator gen, SerializerProvider serializers)
throws IOException {
gen.writeString(value ? "true" : "false");
}
错误
[row index 76]: invalid: Cannot convert string value to boolean: 1
at com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor.maybeThrowEncounteredErrors(KCBQThreadPoolExecutor.java:108)
at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.put(BigQuerySinkTask.java:233)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
我将此保留为社区 Wiki 回复,以提高社区对此类问题的可见度。正如您对 connector devs, your request may need a feature implemented on bigquery that needs to be requested to by opening a BigQuery Feature Request 提出的问题所述。这将使此类请求成为开发人员关注的焦点,以便予以考虑。
值得一提的是,你可以在项目issue dashboard.
上看到confluentinc kafka-connect-bigquery的issues/bugs列表