在通过 CamelKafkaAzureBlobSink 连接器的每条记录上附加一个换行符

Append a new line character at every record passing through CamelKafkaAzureBlobSink connector

我们目前正在研究数据管道堆栈,我们在其中使用了 CamelAzurestorageblobSinkConnector(0.9.x),它基本上是从 Kafka(cp-kafka-5.0.0) 读取特定主题并附加每条记录到特定的 Azure AppendBlob。

同步工作进行得很顺利,但是我们在堆栈中发现了一个小故障

The JSON records have been appended to the blob file without any line break like below -

{"uuid":"6e7190e2-987d-44f5-9b20-ba854d8d4274","foo":"bar"}{"uuid":"6f0d3912-b7c1-4cc4-a41b-0d54cd623373","foo":"bar"}{"foo":"bar"}

这会影响 blob 文件的进一步处理。

我们的CamelAzurestorageblobSinkConnector.properties如下所示-

name=CamelAzure-storage-blobSinkConnector
connector.class=org.apache.camel.kafkaconnector.azurestorageblob.CamelAzurestorageblobSinkConnector
tasks.max=1
camel.sink.marshal=json-jackson
# comma separated topics to get messages from
topics=test-topic

camel.sink.path.accountName=<storage-account>
camel.sink.path.containerName=<blob-container>
camel.sink.endpoint.blobName=data/test-topic/${date:now:yyyyMMdd}/${date:now:HH}-id.json
camel.sink.endpoint.accessKey=<account-key>
camel.sink.endpoint.operation=commitAppendBlob
camel.sink.endpoint.createAppendBlob=true
camel.sink.endpoint.blobType=appendblob

如有任何帮助,我们将不胜感激!

首先感谢@OneCricketeer 的建议。

我成功地能够在每个 event/record 通过接收器连接器时附加行分隔符。

已在 camel 组件级别完成以下配置:

  1. Modify the key and value converter from JSONConverter to StringConverter. Here is the modified CamelAzurestorageblobSinkConnector.properties
name=CamelAzure-storage-blobSinkConnector 
connector.class=org.apache.camel.kafkaconnector.azurestorageblob.CamelAzurestorageblobSinkConnector
tasks.max=1
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
topics=test-topic
camel.sink.path.accountName=<storage-account>
camel.sink.path.containerName=<blob-container>
camel.sink.endpoint.blobName=data/test-topic/${date:now:yyyyMMdd}/${date:now:HH}-id.json
camel.sink.endpoint.accessKey=<account-key>
camel.sink.endpoint.operation=commitAppendBlob
camel.sink.endpoint.createAppendBlob=true
camel.sink.endpoint.blobType=appendblob
  1. Override the apply() method like this -
 @Override
 private R apply (R record) {
    
    String value = (String) operatingValue(record);
    
    String updated_value = value+System.lineSeparator();
    System.out.println("UPDATED SMT RECORD: "+updated_value);

    return newRecord(record, null, updated_value);
  }

Now we have the records inside the blob like below -

{"uuid":"6e7190e2-987d-44f5-9b20-ba854d8d4274","foo":"bar"} 
{"uuid":"6f0d3912-b7c1-4cc4-a41b-0d54cd623373","foo":"bar"}