Avro 和 Kafka 通过使用 SchemaBuilder
Avro and Kafka by making use of SchemaBuilder
我完成了 baeldung 的教程。他们提到有两种创建模式的方法。
- 通过编写 json 表示并添加 maven 插件来生成 class
- 通过使用
SchemaBuilder
,他们也提到这是更好的选择。
不幸的是,在 git 示例中我只看到了 json 方式。
假设我有这个 Avro 模式:
{
"type":"record",
"name":"TestFile",
"namespace":"com.example.kafka.data.ingestion.model",
"fields":[
{
"name":"date",
"type":"long"
},
{
"name":"counter",
"type":"int"
},
{
"name":"mc",
"type":"string"
}
]
}
通过在我的 pom 文件中添加这个插件:
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.8.0</version>
<executions>
<execution>
<id>schemas</id>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
<goal>protocol</goal>
<goal>idl-protocol</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/resources/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
并使用生成源构建一个 TestFile.java
到我说的目的地。
然后为了发送到 kafka 主题,我可以执行以下操作:
TestFile test = TestFile.newBuilder()
.setDate(102928374747)
.setCounter(2)
.setMc("Some string")
.build();
kafkaTemplate.send(topicName, test);
用 SchemaBuilder
创建模式的等价物是:
Schema testFileSchema = SchemaBuilder .record("TestFile")
.namespace("com.example.kafka.data.ingestion.model")
.fields()
.requiredLong("date")
.requiredInt("counter")
.requiredString("mc")
.endRecord();
但是我现在如何生成 POJO 并将我的 TestFile
数据发送到我的 kafka 主题?
您将无法访问 TestFile
对象,因为架构是在运行时创建的,而不是预编译的。如果你想保留那个 POJO,那么你需要 public TestFile(GenericRecord avroRecord)
的构造函数
您需要使用该 Schema
对象创建一个 GenericRecord
,就像您从字符串或文件中解析它一样。
例如,
Schema schema = SchemaBuilder.record("TestFile")
.namespace("com.example.kafka.data.ingestion.model")
.fields()
.requiredLong("date")
.requiredInt("counter")
.requiredString("mc")
.endRecord();
GenericRecord entry1 = new GenericData.Record(schema);
entry1.put("date", 1L);
entry1.put("counter", 2);
entry1.put("mc", "3");
// producer.send(new ProducerRecord<>(topic, entry1);
完整的 Kafka 示例是 available from Confluent
如果你不包含必填字段,它会抛出错误,并且不会检查类型的值(我可以输入 "counter", "2"
,它会发送一个字符串值(这对我来说似乎是个错误。基本上,GenericRecord == HashMap<String, Object>
具有 reqiured/nullable 字段的额外好处。
并且您将需要配置一个 Avro 序列化程序,例如 Confluent 的序列化程序,它需要 运行 他们的 Schema Registry,或者像 Cloudera shows
这样的版本
否则,您需要将 Avro 对象转换为 byte[]
(如您的链接所示,只需使用 ByteArraySerializer
如 Baeldung 教程中所述:
Later we can apply the toString method to get the JSON structure of
Schema.
因此,例如在主 class 中使用此代码,您可以将两个模式定义打印到控制台输出。
然后您可以将生成的 json 表示保存到 .avsc 文件并像以前一样生成 pojo。
Schema clientIdentifier = SchemaBuilder.record("ClientIdentifier")
.namespace("com.baeldung.avro")
.fields().requiredString("hostName").requiredString("ipAddress")
.endRecord();
System.out.println(clientIdentifier.toString());
Schema avroHttpRequest = SchemaBuilder.record("AvroHttpRequest")
.namespace("com.baeldung.avro")
.fields().requiredLong("requestTime")
.name("clientIdentifier")
.type(clientIdentifier)
.noDefault()
.name("employeeNames")
.type()
.array()
.items()
.stringType()
.arrayDefault(new ArrayList<>())
.name("active")
.type()
.enumeration("Active")
.symbols("YES","NO")
.noDefault()
.endRecord();
System.out.println(avroHttpRequest.toString());
还有第三种生成 Avro 模式的方法,即使用 Avro IDL
我完成了 baeldung 的教程。他们提到有两种创建模式的方法。
- 通过编写 json 表示并添加 maven 插件来生成 class
- 通过使用
SchemaBuilder
,他们也提到这是更好的选择。
不幸的是,在 git 示例中我只看到了 json 方式。
假设我有这个 Avro 模式:
{
"type":"record",
"name":"TestFile",
"namespace":"com.example.kafka.data.ingestion.model",
"fields":[
{
"name":"date",
"type":"long"
},
{
"name":"counter",
"type":"int"
},
{
"name":"mc",
"type":"string"
}
]
}
通过在我的 pom 文件中添加这个插件:
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.8.0</version>
<executions>
<execution>
<id>schemas</id>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
<goal>protocol</goal>
<goal>idl-protocol</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/resources/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
并使用生成源构建一个 TestFile.java
到我说的目的地。
然后为了发送到 kafka 主题,我可以执行以下操作:
TestFile test = TestFile.newBuilder()
.setDate(102928374747)
.setCounter(2)
.setMc("Some string")
.build();
kafkaTemplate.send(topicName, test);
用 SchemaBuilder
创建模式的等价物是:
Schema testFileSchema = SchemaBuilder .record("TestFile")
.namespace("com.example.kafka.data.ingestion.model")
.fields()
.requiredLong("date")
.requiredInt("counter")
.requiredString("mc")
.endRecord();
但是我现在如何生成 POJO 并将我的 TestFile
数据发送到我的 kafka 主题?
您将无法访问 TestFile
对象,因为架构是在运行时创建的,而不是预编译的。如果你想保留那个 POJO,那么你需要 public TestFile(GenericRecord avroRecord)
您需要使用该 Schema
对象创建一个 GenericRecord
,就像您从字符串或文件中解析它一样。
例如,
Schema schema = SchemaBuilder.record("TestFile")
.namespace("com.example.kafka.data.ingestion.model")
.fields()
.requiredLong("date")
.requiredInt("counter")
.requiredString("mc")
.endRecord();
GenericRecord entry1 = new GenericData.Record(schema);
entry1.put("date", 1L);
entry1.put("counter", 2);
entry1.put("mc", "3");
// producer.send(new ProducerRecord<>(topic, entry1);
完整的 Kafka 示例是 available from Confluent
如果你不包含必填字段,它会抛出错误,并且不会检查类型的值(我可以输入 "counter", "2"
,它会发送一个字符串值(这对我来说似乎是个错误。基本上,GenericRecord == HashMap<String, Object>
具有 reqiured/nullable 字段的额外好处。
并且您将需要配置一个 Avro 序列化程序,例如 Confluent 的序列化程序,它需要 运行 他们的 Schema Registry,或者像 Cloudera shows
这样的版本否则,您需要将 Avro 对象转换为 byte[]
(如您的链接所示,只需使用 ByteArraySerializer
如 Baeldung 教程中所述:
Later we can apply the toString method to get the JSON structure of Schema.
因此,例如在主 class 中使用此代码,您可以将两个模式定义打印到控制台输出。
然后您可以将生成的 json 表示保存到 .avsc 文件并像以前一样生成 pojo。
Schema clientIdentifier = SchemaBuilder.record("ClientIdentifier")
.namespace("com.baeldung.avro")
.fields().requiredString("hostName").requiredString("ipAddress")
.endRecord();
System.out.println(clientIdentifier.toString());
Schema avroHttpRequest = SchemaBuilder.record("AvroHttpRequest")
.namespace("com.baeldung.avro")
.fields().requiredLong("requestTime")
.name("clientIdentifier")
.type(clientIdentifier)
.noDefault()
.name("employeeNames")
.type()
.array()
.items()
.stringType()
.arrayDefault(new ArrayList<>())
.name("active")
.type()
.enumeration("Active")
.symbols("YES","NO")
.noDefault()
.endRecord();
System.out.println(avroHttpRequest.toString());
还有第三种生成 Avro 模式的方法,即使用 Avro IDL