在通过 CamelKafkaAzureBlobSink 连接器的每条记录上附加一个换行符
Append a new line character at every record passing through CamelKafkaAzureBlobSink connector
我们目前正在研究数据管道堆栈,我们在其中使用了 CamelAzurestorageblobSinkConnector(0.9.x),它基本上是从 Kafka(cp-kafka-5.0.0) 读取特定主题并附加每条记录到特定的 Azure AppendBlob。
同步工作进行得很顺利,但是我们在堆栈中发现了一个小故障。
The JSON records have been appended to the blob file without any line break like below -
{"uuid":"6e7190e2-987d-44f5-9b20-ba854d8d4274","foo":"bar"}{"uuid":"6f0d3912-b7c1-4cc4-a41b-0d54cd623373","foo":"bar"}{"foo":"bar"}
这会影响 blob 文件的进一步处理。
我们的CamelAzurestorageblobSinkConnector.properties如下所示-
name=CamelAzure-storage-blobSinkConnector
connector.class=org.apache.camel.kafkaconnector.azurestorageblob.CamelAzurestorageblobSinkConnector
tasks.max=1
camel.sink.marshal=json-jackson
# comma separated topics to get messages from
topics=test-topic
camel.sink.path.accountName=<storage-account>
camel.sink.path.containerName=<blob-container>
camel.sink.endpoint.blobName=data/test-topic/${date:now:yyyyMMdd}/${date:now:HH}-id.json
camel.sink.endpoint.accessKey=<account-key>
camel.sink.endpoint.operation=commitAppendBlob
camel.sink.endpoint.createAppendBlob=true
camel.sink.endpoint.blobType=appendblob
如有任何帮助,我们将不胜感激!
首先感谢@OneCricketeer 的建议。
我成功地能够在每个 event/record 通过接收器连接器时附加行分隔符。
已在 camel 组件级别完成以下配置:
- Modify the key and value converter from JSONConverter to StringConverter. Here is the modified CamelAzurestorageblobSinkConnector.properties
name=CamelAzure-storage-blobSinkConnector
connector.class=org.apache.camel.kafkaconnector.azurestorageblob.CamelAzurestorageblobSinkConnector
tasks.max=1
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
topics=test-topic
camel.sink.path.accountName=<storage-account>
camel.sink.path.containerName=<blob-container>
camel.sink.endpoint.blobName=data/test-topic/${date:now:yyyyMMdd}/${date:now:HH}-id.json
camel.sink.endpoint.accessKey=<account-key>
camel.sink.endpoint.operation=commitAppendBlob
camel.sink.endpoint.createAppendBlob=true
camel.sink.endpoint.blobType=appendblob
- Override the apply() method like this -
@Override
private R apply (R record) {
String value = (String) operatingValue(record);
String updated_value = value+System.lineSeparator();
System.out.println("UPDATED SMT RECORD: "+updated_value);
return newRecord(record, null, updated_value);
}
Now we have the records inside the blob like below -
{"uuid":"6e7190e2-987d-44f5-9b20-ba854d8d4274","foo":"bar"}
{"uuid":"6f0d3912-b7c1-4cc4-a41b-0d54cd623373","foo":"bar"}
我们目前正在研究数据管道堆栈,我们在其中使用了 CamelAzurestorageblobSinkConnector(0.9.x),它基本上是从 Kafka(cp-kafka-5.0.0) 读取特定主题并附加每条记录到特定的 Azure AppendBlob。
同步工作进行得很顺利,但是我们在堆栈中发现了一个小故障。
The JSON records have been appended to the blob file without any line break like below -
{"uuid":"6e7190e2-987d-44f5-9b20-ba854d8d4274","foo":"bar"}{"uuid":"6f0d3912-b7c1-4cc4-a41b-0d54cd623373","foo":"bar"}{"foo":"bar"}
这会影响 blob 文件的进一步处理。
我们的CamelAzurestorageblobSinkConnector.properties如下所示-
name=CamelAzure-storage-blobSinkConnector
connector.class=org.apache.camel.kafkaconnector.azurestorageblob.CamelAzurestorageblobSinkConnector
tasks.max=1
camel.sink.marshal=json-jackson
# comma separated topics to get messages from
topics=test-topic
camel.sink.path.accountName=<storage-account>
camel.sink.path.containerName=<blob-container>
camel.sink.endpoint.blobName=data/test-topic/${date:now:yyyyMMdd}/${date:now:HH}-id.json
camel.sink.endpoint.accessKey=<account-key>
camel.sink.endpoint.operation=commitAppendBlob
camel.sink.endpoint.createAppendBlob=true
camel.sink.endpoint.blobType=appendblob
如有任何帮助,我们将不胜感激!
首先感谢@OneCricketeer 的建议。
我成功地能够在每个 event/record 通过接收器连接器时附加行分隔符。
已在 camel 组件级别完成以下配置:
- Modify the key and value converter from JSONConverter to StringConverter. Here is the modified CamelAzurestorageblobSinkConnector.properties
name=CamelAzure-storage-blobSinkConnector
connector.class=org.apache.camel.kafkaconnector.azurestorageblob.CamelAzurestorageblobSinkConnector
tasks.max=1
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
topics=test-topic
camel.sink.path.accountName=<storage-account>
camel.sink.path.containerName=<blob-container>
camel.sink.endpoint.blobName=data/test-topic/${date:now:yyyyMMdd}/${date:now:HH}-id.json
camel.sink.endpoint.accessKey=<account-key>
camel.sink.endpoint.operation=commitAppendBlob
camel.sink.endpoint.createAppendBlob=true
camel.sink.endpoint.blobType=appendblob
- Override the apply() method like this -
@Override
private R apply (R record) {
String value = (String) operatingValue(record);
String updated_value = value+System.lineSeparator();
System.out.println("UPDATED SMT RECORD: "+updated_value);
return newRecord(record, null, updated_value);
}
Now we have the records inside the blob like below -
{"uuid":"6e7190e2-987d-44f5-9b20-ba854d8d4274","foo":"bar"}
{"uuid":"6f0d3912-b7c1-4cc4-a41b-0d54cd623373","foo":"bar"}