Scala:无法将消息发送到 Kafka(托管在远程服务器上)
Scala : Unable to send message to Kafka (hosted on remote server)
我正在使用 Scala 2.12 并且需要库将消息转换为 Avro(需要转换)和 kafka 客户端。
我正在 运行 在 Linux 主机 (dev) 上 运行ning 其他应用程序 (Apache NiFi) 运行ning 上的代码,并且能够创建 KafkaProducer 并将消息发布到远程卡夫卡。
由于目前是dev,协议是PLAINTEXT。
例如Nifi 中的 KafkaProducer 配置。
acks = 1
batch.size = 16384
block.on.buffer.full = false
bootstrap.servers = [server1.cloud.domain:9096, server2.cloud.domain:9096, server3.cloud.domain:9096]
buffer.memory = 33554432
client.id =
compression.type = none
connections.max.idle.ms = 540000
interceptor.classes = null
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 0
max.block.ms = 5000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.fetch.timeout.ms = 60000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = kafka
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
timeout.ms = 30000
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
此外,NiFi以java选项开始使用JAAS文件,其内容为:
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
principal="myUserName@myRealm"
useKeyTab=true
client=true
keyTab="/path/myfile.keytab"
serviceName="kafka";
};
krb5.conf 文件也可用。
使用上面的配置,NiFi 能够创建 KafkaProducer 并发送消息。
现在,我在 Scala 代码中使用相同的代码。简单 Class 使用以下 build.sbt 和代码来发送消息。
build.sbt:
// https://mvnrepository.com/artifact/org.apache.avro/avro
libraryDependencies += "org.apache.avro" % "avro" % "1.8.1"
// https://mvnrepository.com/artifact/org.apache.kafka/kafka
libraryDependencies += "org.apache.kafka" %% "kafka" % "2.1.1"
libraryDependencies += "org.slf4j" % "slf4j-simple" % "1.6.4"
fork in run := true
javaOptions += "-Djava.security.auth.login.config=/path/to/jaas/kafka-jaas.conf"
javaOptions += "-Djava.security.krb5.conf=/path/to/krb/krb5.conf"
我发送消息的代码。为简洁起见,删除了不需要的行。请注意,为 Avro 创建数据的测试 运行 没问题。当给 NiFi 相同的消息时,它能够正确地发布到主题。什么不是 运行ning,是使用 Scala 发布到 kafka。
代码:
package example
import java.io.ByteArrayOutputStream
import java.util
import java.io.File
import java.util.{Properties, UUID}
import org.apache.avro.Schema.Parser
import org.apache.avro.Schema
import org.apache.avro.file.DataFileWriter
import org.apache.avro.generic.{GenericData, GenericDatumReader, GenericDatumWriter, GenericRecord}
import org.apache.avro.specific.SpecificDatumWriter
import org.apache.avro.generic.GenericData.Record
import org.apache.avro.io.{DecoderFactory, EncoderFactory}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import scala.io.Source
import scala.io.StdIn
object Hello extends Greeting with App {
// case classes for creating avro record
// This part works fine.
val schemaFile = "/path/Schema.avsc"
val schema = new Schema.Parser().parse(new File(schemaFile))
val reader = new GenericDatumReader[GenericRecord](schema)
val avroRecord = new GenericData.Record(schema)
// populate correctly the record.
// works fine.
val brokers = "server1.domain:9096,server2.domain:9096,server3.domain:9096"
val topic = "myTopic"
private def configuration: Properties = {
val props = new Properties()
props.put("bootstrap.servers", brokers)
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer")
props.put("security.protocol", "PLAINTEXT")
props.put("sasl.kerberos.service.name", "kafka")
props.put("acks", "all")
props.put("retries","0")
props
}
val producer = new KafkaProducer[String, Array[Byte]](configuration)
val writer = new SpecificDatumWriter[GenericRecord](schema)
val out = new ByteArrayOutputStream()
val encoder = EncoderFactory.get.binaryEncoder(out, null)
writer.write(avroRecord, encoder)
encoder.flush()
out.close()
val serializedBytes: Array[Byte] = out.toByteArray()
val recordToSend = new ProducerRecord[String, Array[Byte]](topic, serializedBytes)
producer.send(recordToSend)
}
trait Greeting {
lazy val greeting: String = "hello"
}
当我在 sbt 命令行 运行 它时:
sbt 干净
sbt 编译
sbt 运行
我得到以下 error/output。没有发布。
输出:
-bash-4.2$ sbt run
[warn] Executing in batch mode.
[warn] For better performance, hit [ENTER] to switch to interactive mode, or
[warn] consider launching sbt without any commands, or explicitly passing 'shell'
[info] Loading project definition from /path/Scala/hello-world/project
[info] Set current project to hello-world (in build file:/path/Scala/hello-world/)
[info] Running example.Hello
[info] hello
[info]
[error] 9 [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
[error] acks = 1
[error] batch.size = 16384
[error] bootstrap.servers = [server1.cloud.domain:9096, server2.cloud.domain:9096, server3.cloud.domain:9096]
[error] buffer.memory = 33554432
[error] client.dns.lookup = default
[error] client.id =
[error] compression.type = none
[error] connections.max.idle.ms = 540000
[error] delivery.timeout.ms = 120000
[error] enable.idempotence = false
[error] interceptor.classes = []
[error] key.serializer = class org.apache.kafka.common.serialization.StringSerializer
[error] linger.ms = 0
[error] max.block.ms = 60000
[error] max.in.flight.requests.per.connection = 5
[error] max.request.size = 1048576
[error] metadata.max.age.ms = 300000
[error] metric.reporters = []
[error] metrics.num.samples = 2
[error] metrics.recording.level = INFO
[error] metrics.sample.window.ms = 30000
[error] partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
[error] receive.buffer.bytes = 32768
[error] reconnect.backoff.max.ms = 1000
[error] reconnect.backoff.ms = 50
[error] request.timeout.ms = 30000
[error] retries = 0
[error] retry.backoff.ms = 100
[error] sasl.client.callback.handler.class = null
[error] sasl.jaas.config = null
[error] sasl.kerberos.kinit.cmd = /usr/bin/kinit
[error] sasl.kerberos.min.time.before.relogin = 60000
[error] sasl.kerberos.service.name = kafka
[error] sasl.kerberos.ticket.renew.jitter = 0.05
[error] sasl.kerberos.ticket.renew.window.factor = 0.8
[error] sasl.login.callback.handler.class = null
[error] sasl.login.class = null
[error] sasl.login.refresh.buffer.seconds = 300
[error] sasl.login.refresh.min.period.seconds = 60
[error] sasl.login.refresh.window.factor = 0.8
[error] sasl.login.refresh.window.jitter = 0.05
[error] sasl.mechanism = GSSAPI
[error] security.protocol = PLAINTEXT
[error] send.buffer.bytes = 131072
[error] ssl.cipher.suites = null
[error] ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
[error] ssl.endpoint.identification.algorithm =
[error] ssl.key.password = null
[error] ssl.keymanager.algorithm = SunX509
[error] ssl.keystore.location = null
[error] ssl.keystore.password = null
[error] ssl.keystore.type = JKS
[error] ssl.protocol = TLS
[error] ssl.provider = null
[error] ssl.secure.random.implementation = null
[error] ssl.trustmanager.algorithm = PKIX
[error] ssl.truststore.location = null
[error] ssl.truststore.password = null
[error] ssl.truststore.type = JKS
[error] transaction.timeout.ms = 60000
[error] transactional.id = null
[error] value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
[error]
[error] 109 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.1.1
[error] 109 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 21234bee31165527
[error] 248 [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - Cluster ID: 5NMDh7lDS-SxXpgprjR6oA
[success] Total time: 1 s, completed Mar 6, 2019 1:38:14 PM
我敢肯定,它必须与安全或 kerberos 相关。但是其他应用程序能够推送消息,而不是我的 scala 代码。
更新:
根据@tgrez 的回复,我尝试用 Future get 来阻止。
//producer.send(recordToSend)
val metaF: Future[RecordMetadata] = producer.send(recordToSend)
val meta = metaF.get() //blocking
val msgLog =
s"""
|offset = ${meta.offset()}
|partition = ${meta.partition()}
|topic = ${meta.topic()}
""".stripMargin
println(msgLog)
producer.close()
但是我还是出现了类似的错误。
[error] 10 [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
[error] acks = 1
[error] batch.size = 16384
[error] bootstrap.servers = [server1.cloud.domain:9096, server2.cloud.domain:9096, server3.cloud.domain:9096]
[error] buffer.memory = 33554432
[error] client.dns.lookup = default
[error] client.id =
[error] compression.type = none
[error] connections.max.idle.ms = 540000
[error] delivery.timeout.ms = 120000
[error] enable.idempotence = false
[error] interceptor.classes = []
[error] key.serializer = class org.apache.kafka.common.serialization.StringSerializer
[error] linger.ms = 0
[error] max.block.ms = 60000
[error] max.in.flight.requests.per.connection = 5
[error] max.request.size = 1048576
[error] metadata.max.age.ms = 300000
[error] metric.reporters = []
[error] metrics.num.samples = 2
[error] metrics.recording.level = INFO
[error] metrics.sample.window.ms = 30000
[error] partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
[error] receive.buffer.bytes = 32768
[error] reconnect.backoff.max.ms = 1000
[error] reconnect.backoff.ms = 50
[error] request.timeout.ms = 30000
[error] retries = 0
[error] retry.backoff.ms = 100
[error] sasl.client.callback.handler.class = null
[error] sasl.jaas.config = null
[error] sasl.kerberos.kinit.cmd = /usr/bin/kinit
[error] sasl.kerberos.min.time.before.relogin = 60000
[error] sasl.kerberos.service.name = kafka
[error] sasl.kerberos.ticket.renew.jitter = 0.05
[error] sasl.kerberos.ticket.renew.window.factor = 0.8
[error] sasl.login.callback.handler.class = null
[error] sasl.login.class = null
[error] sasl.login.refresh.buffer.seconds = 300
[error] sasl.login.refresh.min.period.seconds = 60
[error] sasl.login.refresh.window.factor = 0.8
[error] sasl.login.refresh.window.jitter = 0.05
[error] sasl.mechanism = GSSAPI
[error] security.protocol = PLAINTEXT
[error] send.buffer.bytes = 131072
[error] ssl.cipher.suites = null
[error] ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
[error] ssl.endpoint.identification.algorithm =
[error] ssl.key.password = null
[error] ssl.keymanager.algorithm = SunX509
[error] ssl.keystore.location = null
[error] ssl.keystore.password = null
[error] ssl.keystore.type = JKS
[error] ssl.protocol = TLS
[error] ssl.provider = null
[error] ssl.secure.random.implementation = null
[error] ssl.trustmanager.algorithm = PKIX
[error] ssl.truststore.location = null
[error] ssl.truststore.password = null
[error] ssl.truststore.type = JKS
[error] transaction.timeout.ms = 60000
[error] transactional.id = null
[error] value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
[error]
[error] 110 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.1.1
[error] 110 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 21234bee31165527
[error] 249 [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - Cluster ID: 5NMDh7lDS-SxXpgprjR6oA
[info]
[info] offset = 8
[info] partition = 1
[info] topic = myTopic
[info]
[error] 323 [main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[success] Total time: 1 s, completed Mar 6, 2019 3:26:53 PM
我在这里遗漏了什么吗?
更新 2:
如下所述,我更改了代码。但是它也不起作用。我意识到序列化出了点问题。
我已经有 GenericData.Record 格式的 avroRecord。我不能使用它来将数据发布到 Kafka 吗?为什么我必须为此使用字节数组或任何其他序列化程序?
我找到的唯一示例是使用 io.confluent avro 序列化程序。但我无法使用它,因为 sbt 或 maven 现在无法下载它。事实上 URL: http://packages.confluent.io/maven/ 不工作。我以某种方式下载了 jars 并将其用作外部库。
更改为代码:
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")
val producer = new KafkaProducer[String, GenericData.Record](configuration)
val recordToSend = new ProducerRecord[String, GenericData.Record](topic, avroRecord)
现在一切正常。
但是,我仍在寻找任何其他序列化程序 class(在 Maven 中可用)以将消息作为 GenericData 而不是字节数组发送。
更新 3:
根据用户@KZapagol 的建议,我尝试使用相同的方法并出现以下错误。
模式:(它很复杂,如果我正确转换数据,需要帮助)
{"type": "record","name": "MyPnl","doc": "This schema contains the metadata fields wrapped in a header field which follows the official schema.","fields": [{"name":"header","type":{"type":"record","name":"header","fields":[{"name":"messageId","type":"string"},{"name":"businessId","type":"string"},{"name":"batchId","type":"string"},{"name":"sourceSystem","type":"string"},{"name":"secondarySourceSystem","type":[ "null", "string" ]},{"name":"sourceSystemCreationTimestamp","type":"long","logicalType": "timestamp-millis"},{"name":"sentBy","type":"string"},{"name":"sentTo","type":"string"},{"name":"messageType","type":"string"},{"name":"schemaVersion","type":"string"},{"name":"processing","type":"string"},{"name":"recordOffset","type":[ "null", "string" ]}]}},{"name":"pnlData","type":{"type":"record","name":"pnlData","fields":[{"name":"pnlHeader","type":{"type":"record","name":"pnlData","namespace":"pnlHeader","fields":[{"name":"granularity","type":"string"},{"name":"pnlType","type":"string"},{"name":"pnlSubType","type":"string"},{"name":"businessDate","type":"string","logicalType": "date"},{"name":"bookId","type":"string"},{"name":"bookDescription","type":"string"},{"name":"pnlStatus","type":"string"}]}},{"name":"pnlBreakDown","type":{"type":"array","items":{"type":"record","name":"pnlData","namespace":"pnlBreakDown","fields":[{"name":"category","type":[ "null", "string" ]},{"name":"subCategory","type":[ "null", "string" ]},{"name":"riskCategory","type":[ "null", "string" ]},{"name":"pnlCurrency","type":"string"},{"name":"pnlDetails", "type":{"type":"array","items": {"type":"record","name":"pnlData","namespace":"pnlDetails","fields":[{"name":"pnlLocalAmount","type":"double"},{"name":"pnlCDEAmount","type":"double"}]}}}]}}}]}}]}
上面我有对应的案例class。 (请指出我是否遗漏了什么?)
case class MessageHeader( messageId: String,
businessId: String,
batchId: String,
sourceSystem: String,
secondarySourceSystem: String,
sourceSystemCreationTimestamp: Long,
sentBy: String,
sentTo: String,
messageType: String,
schemaVersion: String,
processing: String,
recordOffset: String
)
case class PnlHeader ( granularity: String,
pnlType: String,
pnlSubType: String,
businessDate: String,
bookId: String,
bookDescription: String,
pnlStatus: String
)
case class PnlDetails ( pnlLocalAmount: Double,
pnlCDEAmount: Double
)
case class PnlBreakdown ( category: String,
subCategory: String,
riskCategory: String,
pnlCurrency: String,
pnlDetails: List[PnlDetails]
)
case class PnlData ( pnlHeader: PnlHeader, pnlBreakdown: List[PnlBreakdown] )
case class PnlRecord (header: MessageHeader, pnlData: PnlData )
我已将我的数据建模为上述 PnlRecord 格式。我有这样的记录列表。
从这样的记录列表中,我迭代并尝试将其发布到 Kafka。
// Create Producer
val producer = new KafkaProducer[String, Array[Byte]](properties)
// This filename is file where above schema is saved.
val avroJsonSchema = Source.fromFile(new File(schemaFileName)).getLines.mkString
val avroMessage = new AvroMessage(avroJsonSchema)
val avroRecord = new Record(avroMessage.schema)
// recordListToSend is of type: List[PnlRecord]
for (record <- recordListToSend) {
avroRecord.put("header", record.header)
avroRecord.put("pnlData", record.pnlData)
//logger.info(s"Record: ${avroRecord}\n")
avroMessage.gdw.write(avroRecord, EncoderFactory.get().binaryEncoder(avroMessage.baos, null))
avroMessage.dfw.append(avroRecord)
avroMessage.dfw.close()
val bytes = avroMessage.baos.toByteArray
// send data
producer.send(new ProducerRecord[String, Array[Byte]](topic, bytes), new ProducerCallback)
//flush data
producer.flush()
//flush and close producer
producer.close()
}
AvroMessage class(根据用户建议)
import java.io.ByteArrayOutputStream
import org.apache.avro
import org.apache.avro.Schema
import org.apache.avro.file.CodecFactory
import org.apache.avro.generic.{GenericDatumWriter, GenericRecord}
class AvroMessage(avroJsonSchema: String) {
val parser = new Schema.Parser()
val schema = parser.parse(avroJsonSchema)
val baos = new ByteArrayOutputStream()
val gdw = new GenericDatumWriter[GenericRecord](schema)
val dfw = new avro.file.DataFileWriter[GenericRecord](gdw)
val compressionLevel = 5
dfw.setCodec(CodecFactory.deflateCodec(compressionLevel))
dfw.create(schema, baos)
}
我收到以下错误:
2019-03-13 16:00:09.855 [application-akka.actor.default-dispatcher-11] ERROR controllers.SAController.$anonfun$publishToSA(34) - com.domain.sa.model.MessageHeader cannot be cast to org.apache.avro.generic.IndexedRecord
java.lang.ClassCastException: ca.domain.my.sa.model.MessageHeader cannot be cast to org.apache.avro.generic.IndexedRecord
at org.apache.avro.generic.GenericData.getField(GenericData.java:697)
at org.apache.avro.generic.GenericData.getField(GenericData.java:712)
at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:164)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
at ca.domain.my.sa.dao.myPnlDao$.$anonfun$publishAvroToKafka(myPnlDao.scala:95)
我的原始案例class符合架构吗?
我的 MessageHeader 案例 class 如上所示。
我的架构如上所示(已更新)。
我的记录:
Record: {"header": Header(my_20190313180602_00000011,my_BookLevel_Daily_Regular_20181130_EMERGINGTRS,11_20181130_8259,my,null,65162584,my,SA,PnLMessage,test,RealTime,null), "pnlData": PnlData(PnlHeader(BookLevel,Daily,Regular,2018-11-30,8259,EMERGINGTRS,Locked),List(PnlBreakdown(null,null,null,eur,List(PnlDetails(0.0,0.0022547507286072))), PnlBreakdown(null,null,null,jpy,List(PnlDetails(0.0,0.0))), PnlBreakdown(null,null,null,usd,List(PnlDetails(0.19000003399301,0.642328574985149))), PnlBreakdown(null,null,null,brl,List(PnlDetails(2.65281414613128E-8,2.4107750505209E-5))), PnlBreakdown(null,null,null,gbp,List(PnlDetails(0.0,-5.05781173706088E-5))), PnlBreakdown(null,null,null,cad,List(PnlDetails(145.399999991953,145.399999991953)))))}
它可能比看起来更简单。 send
方法是异步的,它returns一个Future<RecordMetadata>
。您的示例在消息实际发送之前退出。
Kafka 生产者正在后台批处理消息,因此为确保消息被发送,您应该使用例如阻塞。 Future.get
(这意味着等待代理以元数据响应)或确保使用 kafkaProducer.flush()
.
刷新缓冲区
在测试中,我建议在 Future
上阻止。
请按以下方式更新您的代码并尝试一次。看起来你没有正确关闭输出流、编码器和生产者。
val producer = new KafkaProducer[String, Array[Byte]](configuration)
val writer = new SpecificDatumWriter[GenericRecord](schema)
val out = new ByteArrayOutputStream()
val encoder = EncoderFactory.get.binaryEncoder(out, null)
writer.write(avroRecord, encoder)
val serializedBytes: Array[Byte] = out.toByteArray()
encoder.flush()
out.close()
val recordToSend = new ProducerRecord[String, Array[Byte]](topic, serializedBytes)
producer.send(recordToSend,new ProducerCallback)
//flush data
producer.flush()
//flush and close producer
producer.close()
class ProducerCallback(implicit logger: Logger) extends Callback {
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
//executes every time a record is successfully sent or exception thrown
Option(metadata) match {
case Some(_) =>
logger.info("Received new metadata. \n" +
"Topic: " + metadata.topic() + "\n" +
"Partition: " + metadata.partition() + "\n" +
"Offset: " + metadata.offset() + "\n" +
"Timestamp: " + metadata.timestamp() + "\n" +
"Checksum: " + metadata.checksum())
case None => ;
}
Option(exception) match {
case Some(_) =>
logger.error("Exception thrown during processing of record... " + exception)
throw exception
case None => ;
}
}
}
更多kafka生产者和消费者示例请参考link https://github.com/Zapagol/apache-kafka/tree/master/src/main/scala/com/org/apache。希望它会有所帮助!
更新
我已经为 Avroschema 输入添加了 KafkaProducer 示例。请参考 https://github.com/Zapagol/apache-kafka/blob/master/src/main/scala/com/org/apache/producers/ProducerForAvroschema.scala 。
我使用了 apache avro jar 和示例 avsc 文件,如下所示。请根据您的修改模式文件 requirement.And 我可以成功生成记录。
{
"type": "record",
"name": "employee",
"fields": [
{"name": "name", "type": "string"},
{"name": "id", "type": "int"},
{"name": "mobileNumber", "type": ["string", "null"]},
{"name": "salary", "type": ["int", "null"]}
]
}
我正在使用 Scala 2.12 并且需要库将消息转换为 Avro(需要转换)和 kafka 客户端。
我正在 运行 在 Linux 主机 (dev) 上 运行ning 其他应用程序 (Apache NiFi) 运行ning 上的代码,并且能够创建 KafkaProducer 并将消息发布到远程卡夫卡。
由于目前是dev,协议是PLAINTEXT。
例如Nifi 中的 KafkaProducer 配置。
acks = 1
batch.size = 16384
block.on.buffer.full = false
bootstrap.servers = [server1.cloud.domain:9096, server2.cloud.domain:9096, server3.cloud.domain:9096]
buffer.memory = 33554432
client.id =
compression.type = none
connections.max.idle.ms = 540000
interceptor.classes = null
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 0
max.block.ms = 5000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.fetch.timeout.ms = 60000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = kafka
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
timeout.ms = 30000
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
此外,NiFi以java选项开始使用JAAS文件,其内容为:
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
principal="myUserName@myRealm"
useKeyTab=true
client=true
keyTab="/path/myfile.keytab"
serviceName="kafka";
};
krb5.conf 文件也可用。
使用上面的配置,NiFi 能够创建 KafkaProducer 并发送消息。
现在,我在 Scala 代码中使用相同的代码。简单 Class 使用以下 build.sbt 和代码来发送消息。
build.sbt:
// https://mvnrepository.com/artifact/org.apache.avro/avro
libraryDependencies += "org.apache.avro" % "avro" % "1.8.1"
// https://mvnrepository.com/artifact/org.apache.kafka/kafka
libraryDependencies += "org.apache.kafka" %% "kafka" % "2.1.1"
libraryDependencies += "org.slf4j" % "slf4j-simple" % "1.6.4"
fork in run := true
javaOptions += "-Djava.security.auth.login.config=/path/to/jaas/kafka-jaas.conf"
javaOptions += "-Djava.security.krb5.conf=/path/to/krb/krb5.conf"
我发送消息的代码。为简洁起见,删除了不需要的行。请注意,为 Avro 创建数据的测试 运行 没问题。当给 NiFi 相同的消息时,它能够正确地发布到主题。什么不是 运行ning,是使用 Scala 发布到 kafka。
代码:
package example
import java.io.ByteArrayOutputStream
import java.util
import java.io.File
import java.util.{Properties, UUID}
import org.apache.avro.Schema.Parser
import org.apache.avro.Schema
import org.apache.avro.file.DataFileWriter
import org.apache.avro.generic.{GenericData, GenericDatumReader, GenericDatumWriter, GenericRecord}
import org.apache.avro.specific.SpecificDatumWriter
import org.apache.avro.generic.GenericData.Record
import org.apache.avro.io.{DecoderFactory, EncoderFactory}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import scala.io.Source
import scala.io.StdIn
object Hello extends Greeting with App {
// case classes for creating avro record
// This part works fine.
val schemaFile = "/path/Schema.avsc"
val schema = new Schema.Parser().parse(new File(schemaFile))
val reader = new GenericDatumReader[GenericRecord](schema)
val avroRecord = new GenericData.Record(schema)
// populate correctly the record.
// works fine.
val brokers = "server1.domain:9096,server2.domain:9096,server3.domain:9096"
val topic = "myTopic"
private def configuration: Properties = {
val props = new Properties()
props.put("bootstrap.servers", brokers)
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer")
props.put("security.protocol", "PLAINTEXT")
props.put("sasl.kerberos.service.name", "kafka")
props.put("acks", "all")
props.put("retries","0")
props
}
val producer = new KafkaProducer[String, Array[Byte]](configuration)
val writer = new SpecificDatumWriter[GenericRecord](schema)
val out = new ByteArrayOutputStream()
val encoder = EncoderFactory.get.binaryEncoder(out, null)
writer.write(avroRecord, encoder)
encoder.flush()
out.close()
val serializedBytes: Array[Byte] = out.toByteArray()
val recordToSend = new ProducerRecord[String, Array[Byte]](topic, serializedBytes)
producer.send(recordToSend)
}
trait Greeting {
lazy val greeting: String = "hello"
}
当我在 sbt 命令行 运行 它时:
sbt 干净
sbt 编译
sbt 运行
我得到以下 error/output。没有发布。
输出:
-bash-4.2$ sbt run
[warn] Executing in batch mode.
[warn] For better performance, hit [ENTER] to switch to interactive mode, or
[warn] consider launching sbt without any commands, or explicitly passing 'shell'
[info] Loading project definition from /path/Scala/hello-world/project
[info] Set current project to hello-world (in build file:/path/Scala/hello-world/)
[info] Running example.Hello
[info] hello
[info]
[error] 9 [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
[error] acks = 1
[error] batch.size = 16384
[error] bootstrap.servers = [server1.cloud.domain:9096, server2.cloud.domain:9096, server3.cloud.domain:9096]
[error] buffer.memory = 33554432
[error] client.dns.lookup = default
[error] client.id =
[error] compression.type = none
[error] connections.max.idle.ms = 540000
[error] delivery.timeout.ms = 120000
[error] enable.idempotence = false
[error] interceptor.classes = []
[error] key.serializer = class org.apache.kafka.common.serialization.StringSerializer
[error] linger.ms = 0
[error] max.block.ms = 60000
[error] max.in.flight.requests.per.connection = 5
[error] max.request.size = 1048576
[error] metadata.max.age.ms = 300000
[error] metric.reporters = []
[error] metrics.num.samples = 2
[error] metrics.recording.level = INFO
[error] metrics.sample.window.ms = 30000
[error] partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
[error] receive.buffer.bytes = 32768
[error] reconnect.backoff.max.ms = 1000
[error] reconnect.backoff.ms = 50
[error] request.timeout.ms = 30000
[error] retries = 0
[error] retry.backoff.ms = 100
[error] sasl.client.callback.handler.class = null
[error] sasl.jaas.config = null
[error] sasl.kerberos.kinit.cmd = /usr/bin/kinit
[error] sasl.kerberos.min.time.before.relogin = 60000
[error] sasl.kerberos.service.name = kafka
[error] sasl.kerberos.ticket.renew.jitter = 0.05
[error] sasl.kerberos.ticket.renew.window.factor = 0.8
[error] sasl.login.callback.handler.class = null
[error] sasl.login.class = null
[error] sasl.login.refresh.buffer.seconds = 300
[error] sasl.login.refresh.min.period.seconds = 60
[error] sasl.login.refresh.window.factor = 0.8
[error] sasl.login.refresh.window.jitter = 0.05
[error] sasl.mechanism = GSSAPI
[error] security.protocol = PLAINTEXT
[error] send.buffer.bytes = 131072
[error] ssl.cipher.suites = null
[error] ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
[error] ssl.endpoint.identification.algorithm =
[error] ssl.key.password = null
[error] ssl.keymanager.algorithm = SunX509
[error] ssl.keystore.location = null
[error] ssl.keystore.password = null
[error] ssl.keystore.type = JKS
[error] ssl.protocol = TLS
[error] ssl.provider = null
[error] ssl.secure.random.implementation = null
[error] ssl.trustmanager.algorithm = PKIX
[error] ssl.truststore.location = null
[error] ssl.truststore.password = null
[error] ssl.truststore.type = JKS
[error] transaction.timeout.ms = 60000
[error] transactional.id = null
[error] value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
[error]
[error] 109 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.1.1
[error] 109 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 21234bee31165527
[error] 248 [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - Cluster ID: 5NMDh7lDS-SxXpgprjR6oA
[success] Total time: 1 s, completed Mar 6, 2019 1:38:14 PM
我敢肯定,它必须与安全或 kerberos 相关。但是其他应用程序能够推送消息,而不是我的 scala 代码。
更新:
根据@tgrez 的回复,我尝试用 Future get 来阻止。
//producer.send(recordToSend)
val metaF: Future[RecordMetadata] = producer.send(recordToSend)
val meta = metaF.get() //blocking
val msgLog =
s"""
|offset = ${meta.offset()}
|partition = ${meta.partition()}
|topic = ${meta.topic()}
""".stripMargin
println(msgLog)
producer.close()
但是我还是出现了类似的错误。
[error] 10 [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
[error] acks = 1
[error] batch.size = 16384
[error] bootstrap.servers = [server1.cloud.domain:9096, server2.cloud.domain:9096, server3.cloud.domain:9096]
[error] buffer.memory = 33554432
[error] client.dns.lookup = default
[error] client.id =
[error] compression.type = none
[error] connections.max.idle.ms = 540000
[error] delivery.timeout.ms = 120000
[error] enable.idempotence = false
[error] interceptor.classes = []
[error] key.serializer = class org.apache.kafka.common.serialization.StringSerializer
[error] linger.ms = 0
[error] max.block.ms = 60000
[error] max.in.flight.requests.per.connection = 5
[error] max.request.size = 1048576
[error] metadata.max.age.ms = 300000
[error] metric.reporters = []
[error] metrics.num.samples = 2
[error] metrics.recording.level = INFO
[error] metrics.sample.window.ms = 30000
[error] partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
[error] receive.buffer.bytes = 32768
[error] reconnect.backoff.max.ms = 1000
[error] reconnect.backoff.ms = 50
[error] request.timeout.ms = 30000
[error] retries = 0
[error] retry.backoff.ms = 100
[error] sasl.client.callback.handler.class = null
[error] sasl.jaas.config = null
[error] sasl.kerberos.kinit.cmd = /usr/bin/kinit
[error] sasl.kerberos.min.time.before.relogin = 60000
[error] sasl.kerberos.service.name = kafka
[error] sasl.kerberos.ticket.renew.jitter = 0.05
[error] sasl.kerberos.ticket.renew.window.factor = 0.8
[error] sasl.login.callback.handler.class = null
[error] sasl.login.class = null
[error] sasl.login.refresh.buffer.seconds = 300
[error] sasl.login.refresh.min.period.seconds = 60
[error] sasl.login.refresh.window.factor = 0.8
[error] sasl.login.refresh.window.jitter = 0.05
[error] sasl.mechanism = GSSAPI
[error] security.protocol = PLAINTEXT
[error] send.buffer.bytes = 131072
[error] ssl.cipher.suites = null
[error] ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
[error] ssl.endpoint.identification.algorithm =
[error] ssl.key.password = null
[error] ssl.keymanager.algorithm = SunX509
[error] ssl.keystore.location = null
[error] ssl.keystore.password = null
[error] ssl.keystore.type = JKS
[error] ssl.protocol = TLS
[error] ssl.provider = null
[error] ssl.secure.random.implementation = null
[error] ssl.trustmanager.algorithm = PKIX
[error] ssl.truststore.location = null
[error] ssl.truststore.password = null
[error] ssl.truststore.type = JKS
[error] transaction.timeout.ms = 60000
[error] transactional.id = null
[error] value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
[error]
[error] 110 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.1.1
[error] 110 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 21234bee31165527
[error] 249 [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - Cluster ID: 5NMDh7lDS-SxXpgprjR6oA
[info]
[info] offset = 8
[info] partition = 1
[info] topic = myTopic
[info]
[error] 323 [main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[success] Total time: 1 s, completed Mar 6, 2019 3:26:53 PM
我在这里遗漏了什么吗?
更新 2:
如下所述,我更改了代码。但是它也不起作用。我意识到序列化出了点问题。
我已经有 GenericData.Record 格式的 avroRecord。我不能使用它来将数据发布到 Kafka 吗?为什么我必须为此使用字节数组或任何其他序列化程序?
我找到的唯一示例是使用 io.confluent avro 序列化程序。但我无法使用它,因为 sbt 或 maven 现在无法下载它。事实上 URL: http://packages.confluent.io/maven/ 不工作。我以某种方式下载了 jars 并将其用作外部库。
更改为代码:
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")
val producer = new KafkaProducer[String, GenericData.Record](configuration)
val recordToSend = new ProducerRecord[String, GenericData.Record](topic, avroRecord)
现在一切正常。
但是,我仍在寻找任何其他序列化程序 class(在 Maven 中可用)以将消息作为 GenericData 而不是字节数组发送。
更新 3:
根据用户@KZapagol 的建议,我尝试使用相同的方法并出现以下错误。
模式:(它很复杂,如果我正确转换数据,需要帮助)
{"type": "record","name": "MyPnl","doc": "This schema contains the metadata fields wrapped in a header field which follows the official schema.","fields": [{"name":"header","type":{"type":"record","name":"header","fields":[{"name":"messageId","type":"string"},{"name":"businessId","type":"string"},{"name":"batchId","type":"string"},{"name":"sourceSystem","type":"string"},{"name":"secondarySourceSystem","type":[ "null", "string" ]},{"name":"sourceSystemCreationTimestamp","type":"long","logicalType": "timestamp-millis"},{"name":"sentBy","type":"string"},{"name":"sentTo","type":"string"},{"name":"messageType","type":"string"},{"name":"schemaVersion","type":"string"},{"name":"processing","type":"string"},{"name":"recordOffset","type":[ "null", "string" ]}]}},{"name":"pnlData","type":{"type":"record","name":"pnlData","fields":[{"name":"pnlHeader","type":{"type":"record","name":"pnlData","namespace":"pnlHeader","fields":[{"name":"granularity","type":"string"},{"name":"pnlType","type":"string"},{"name":"pnlSubType","type":"string"},{"name":"businessDate","type":"string","logicalType": "date"},{"name":"bookId","type":"string"},{"name":"bookDescription","type":"string"},{"name":"pnlStatus","type":"string"}]}},{"name":"pnlBreakDown","type":{"type":"array","items":{"type":"record","name":"pnlData","namespace":"pnlBreakDown","fields":[{"name":"category","type":[ "null", "string" ]},{"name":"subCategory","type":[ "null", "string" ]},{"name":"riskCategory","type":[ "null", "string" ]},{"name":"pnlCurrency","type":"string"},{"name":"pnlDetails", "type":{"type":"array","items": {"type":"record","name":"pnlData","namespace":"pnlDetails","fields":[{"name":"pnlLocalAmount","type":"double"},{"name":"pnlCDEAmount","type":"double"}]}}}]}}}]}}]}
上面我有对应的案例class。 (请指出我是否遗漏了什么?)
case class MessageHeader( messageId: String,
businessId: String,
batchId: String,
sourceSystem: String,
secondarySourceSystem: String,
sourceSystemCreationTimestamp: Long,
sentBy: String,
sentTo: String,
messageType: String,
schemaVersion: String,
processing: String,
recordOffset: String
)
case class PnlHeader ( granularity: String,
pnlType: String,
pnlSubType: String,
businessDate: String,
bookId: String,
bookDescription: String,
pnlStatus: String
)
case class PnlDetails ( pnlLocalAmount: Double,
pnlCDEAmount: Double
)
case class PnlBreakdown ( category: String,
subCategory: String,
riskCategory: String,
pnlCurrency: String,
pnlDetails: List[PnlDetails]
)
case class PnlData ( pnlHeader: PnlHeader, pnlBreakdown: List[PnlBreakdown] )
case class PnlRecord (header: MessageHeader, pnlData: PnlData )
我已将我的数据建模为上述 PnlRecord 格式。我有这样的记录列表。
从这样的记录列表中,我迭代并尝试将其发布到 Kafka。
// Create Producer
val producer = new KafkaProducer[String, Array[Byte]](properties)
// This filename is file where above schema is saved.
val avroJsonSchema = Source.fromFile(new File(schemaFileName)).getLines.mkString
val avroMessage = new AvroMessage(avroJsonSchema)
val avroRecord = new Record(avroMessage.schema)
// recordListToSend is of type: List[PnlRecord]
for (record <- recordListToSend) {
avroRecord.put("header", record.header)
avroRecord.put("pnlData", record.pnlData)
//logger.info(s"Record: ${avroRecord}\n")
avroMessage.gdw.write(avroRecord, EncoderFactory.get().binaryEncoder(avroMessage.baos, null))
avroMessage.dfw.append(avroRecord)
avroMessage.dfw.close()
val bytes = avroMessage.baos.toByteArray
// send data
producer.send(new ProducerRecord[String, Array[Byte]](topic, bytes), new ProducerCallback)
//flush data
producer.flush()
//flush and close producer
producer.close()
}
AvroMessage class(根据用户建议)
import java.io.ByteArrayOutputStream
import org.apache.avro
import org.apache.avro.Schema
import org.apache.avro.file.CodecFactory
import org.apache.avro.generic.{GenericDatumWriter, GenericRecord}
class AvroMessage(avroJsonSchema: String) {
val parser = new Schema.Parser()
val schema = parser.parse(avroJsonSchema)
val baos = new ByteArrayOutputStream()
val gdw = new GenericDatumWriter[GenericRecord](schema)
val dfw = new avro.file.DataFileWriter[GenericRecord](gdw)
val compressionLevel = 5
dfw.setCodec(CodecFactory.deflateCodec(compressionLevel))
dfw.create(schema, baos)
}
我收到以下错误:
2019-03-13 16:00:09.855 [application-akka.actor.default-dispatcher-11] ERROR controllers.SAController.$anonfun$publishToSA(34) - com.domain.sa.model.MessageHeader cannot be cast to org.apache.avro.generic.IndexedRecord
java.lang.ClassCastException: ca.domain.my.sa.model.MessageHeader cannot be cast to org.apache.avro.generic.IndexedRecord
at org.apache.avro.generic.GenericData.getField(GenericData.java:697)
at org.apache.avro.generic.GenericData.getField(GenericData.java:712)
at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:164)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
at ca.domain.my.sa.dao.myPnlDao$.$anonfun$publishAvroToKafka(myPnlDao.scala:95)
我的原始案例class符合架构吗?
我的 MessageHeader 案例 class 如上所示。
我的架构如上所示(已更新)。
我的记录:
Record: {"header": Header(my_20190313180602_00000011,my_BookLevel_Daily_Regular_20181130_EMERGINGTRS,11_20181130_8259,my,null,65162584,my,SA,PnLMessage,test,RealTime,null), "pnlData": PnlData(PnlHeader(BookLevel,Daily,Regular,2018-11-30,8259,EMERGINGTRS,Locked),List(PnlBreakdown(null,null,null,eur,List(PnlDetails(0.0,0.0022547507286072))), PnlBreakdown(null,null,null,jpy,List(PnlDetails(0.0,0.0))), PnlBreakdown(null,null,null,usd,List(PnlDetails(0.19000003399301,0.642328574985149))), PnlBreakdown(null,null,null,brl,List(PnlDetails(2.65281414613128E-8,2.4107750505209E-5))), PnlBreakdown(null,null,null,gbp,List(PnlDetails(0.0,-5.05781173706088E-5))), PnlBreakdown(null,null,null,cad,List(PnlDetails(145.399999991953,145.399999991953)))))}
它可能比看起来更简单。 send
方法是异步的,它returns一个Future<RecordMetadata>
。您的示例在消息实际发送之前退出。
Kafka 生产者正在后台批处理消息,因此为确保消息被发送,您应该使用例如阻塞。 Future.get
(这意味着等待代理以元数据响应)或确保使用 kafkaProducer.flush()
.
在测试中,我建议在 Future
上阻止。
请按以下方式更新您的代码并尝试一次。看起来你没有正确关闭输出流、编码器和生产者。
val producer = new KafkaProducer[String, Array[Byte]](configuration)
val writer = new SpecificDatumWriter[GenericRecord](schema)
val out = new ByteArrayOutputStream()
val encoder = EncoderFactory.get.binaryEncoder(out, null)
writer.write(avroRecord, encoder)
val serializedBytes: Array[Byte] = out.toByteArray()
encoder.flush()
out.close()
val recordToSend = new ProducerRecord[String, Array[Byte]](topic, serializedBytes)
producer.send(recordToSend,new ProducerCallback)
//flush data
producer.flush()
//flush and close producer
producer.close()
class ProducerCallback(implicit logger: Logger) extends Callback {
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
//executes every time a record is successfully sent or exception thrown
Option(metadata) match {
case Some(_) =>
logger.info("Received new metadata. \n" +
"Topic: " + metadata.topic() + "\n" +
"Partition: " + metadata.partition() + "\n" +
"Offset: " + metadata.offset() + "\n" +
"Timestamp: " + metadata.timestamp() + "\n" +
"Checksum: " + metadata.checksum())
case None => ;
}
Option(exception) match {
case Some(_) =>
logger.error("Exception thrown during processing of record... " + exception)
throw exception
case None => ;
}
}
}
更多kafka生产者和消费者示例请参考link https://github.com/Zapagol/apache-kafka/tree/master/src/main/scala/com/org/apache。希望它会有所帮助!
更新
我已经为 Avroschema 输入添加了 KafkaProducer 示例。请参考 https://github.com/Zapagol/apache-kafka/blob/master/src/main/scala/com/org/apache/producers/ProducerForAvroschema.scala 。
我使用了 apache avro jar 和示例 avsc 文件,如下所示。请根据您的修改模式文件 requirement.And 我可以成功生成记录。
{
"type": "record",
"name": "employee",
"fields": [
{"name": "name", "type": "string"},
{"name": "id", "type": "int"},
{"name": "mobileNumber", "type": ["string", "null"]},
{"name": "salary", "type": ["int", "null"]}
]
}