使用 Apache Kafka Streaming 解析 JSON 数据
Parsing JSON data using Apache Kafka Streaming
我有一个场景可以从我的 Kafka 主题中读取 JSON 数据,并且通过使用 Kafka 0.11 版本我需要编写 Java 代码来流式传输 JSON 数据Kafka topic.My 输入中存在一个 Json 包含字典数组的数据。
现在我的要求是从 json 数据中获取数组中包含的字典中的 "text" 字段,并将所有这些文本推文通过 Kafka Streaming 传递到另一个主题。
我写代码到这里。
请帮我解析数据。
Java 流媒体代码
final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
KStreamBuilder builder = new KStreamBuilder();
KStream<String, JsonNode> personstwitter =builder.stream(Serdes.String(), jsonSerde, "Persons");//taking the json node as input
personstwitter.to(Serdes.String(), jsonSerde,"Persons-output");
我建议您执行以下操作以更好地控制 JSON 数据。
- 写一个
Serializer
和De-Serializer
。
- 基于 JSON 字符串创建一个 POJO。 POJO 是对数据进行更多控制的最佳方式。
- 将数据映射到 POJO 以访问所需数据。
POJO:
@JsonRootName("person")
public class Person implements Serializable {
/**
*
*/
private static final long serialVersionUID = 1L;
private String name;
private String personalID;
private String country;
private String occupation;
public Person() {
}
@JsonCreator
public Person(@JsonProperty("name") String name,@JsonProperty("personalID") String personalID,
@JsonProperty("country") String country,@JsonProperty("occupation") String occupation){
this.name= name;
this.personalID = personalID;
this.country = country;
this.occupation = occupation;
}
//getters and setters stripped
}
序列化器:
public class JsonSerializer<T> implements Serializer<T> {
private ObjectMapper om = new ObjectMapper();
@Override
public void close() {
// TODO Auto-generated method stub
}
@Override
public void configure(Map<String, ?> config, boolean isKey) {
// TODO Auto-generated method stub
}
@Override
public byte[] serialize(String topic, T data) {
byte[] retval = null;
try {
System.out.println(data.getClass());
retval = om.writeValueAsString(data).getBytes();
} catch (JsonProcessingException e) {
throw new SerializationException();
}
return retval;
}
}
解串器:
public class JsonDeserializer<T> implements Deserializer<T> {
private ObjectMapper om = new ObjectMapper();
private Class<T> type;
/*
* Default constructor needed by kafka
*/
public JsonDeserializer() {
}
public JsonDeserializer(Class<T> type) {
this.type = type;
}
@Override
public void close() {
// TODO Auto-generated method stub
}
@SuppressWarnings("unchecked")
@Override
public void configure(Map<String, ?> map, boolean arg1) {
if (type == null) {
type = (Class<T>) map.get("type");
}
}
@Override
public T deserialize(String undefined, byte[] bytes) {
T data = null;
if (bytes == null || bytes.length == 0) {
return null;
}
try {
System.out.println(getType());
data = om.readValue(bytes, type);
} catch (Exception e) {
throw new SerializationException(e);
}
return data;
}
protected Class<T> getType() {
return type;
}
}
消费者:
public class ConsumerUtilities {
public static Properties getProperties() {
Properties configs = new Properties();
configs.put(StreamsConfig.APPLICATION_ID_CONFIG,
"Kafka test application");
configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
return configs;
}
public static KStreamBuilder getStreamingConsumer() {
KStreamBuilder builder = new KStreamBuilder();
return builder;
}
public static void getStreamData() {
JsonSerializer<Person> personJsonSerializer = new JsonSerializer<>();
JsonDeserializer<Person> personJsonDeserializer = new JsonDeserializer<>(
Person.class);
Serde<Person> personSerde = Serdes.serdeFrom(personJsonSerializer,
personJsonDeserializer);
KStreamBuilder builder = getStreamingConsumer();
try {
KStream<String, Person> kStream = builder.stream(Serdes.String(),
personSerde, "test");
kStream.foreach(new ForeachAction<String, Person>() {
@Override
public void apply(String arg0, Person arg1) {
System.out.println(arg1.getCountry());
}
});
} catch (Exception s) {
s.printStackTrace();
}
KafkaStreams kafkaStreams = new KafkaStreams(builder, getProperties());
kafkaStreams.start();
}
}
制作人:
public class ProducerUtilities {
public static org.apache.kafka.clients.producer.Producer<String, Person> getProducer() {
Properties configProperties = new Properties();
configProperties.put(ProducerConfig.CLIENT_ID_CONFIG,
"kafka json producer");
configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer");
configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"com.kafka.api.serdes.JsonSerializer");
org.apache.kafka.clients.producer.Producer<String, Person> producer = new KafkaProducer<String, Person>(
configProperties);
return producer;
}
public ProducerRecord<String, Person> createRecord(Person person) {
ProducerRecord<String, Person> record = new ProducerRecord<String, Person>(
"test", person);
return record;
}
}
我有一个场景可以从我的 Kafka 主题中读取 JSON 数据,并且通过使用 Kafka 0.11 版本我需要编写 Java 代码来流式传输 JSON 数据Kafka topic.My 输入中存在一个 Json 包含字典数组的数据。
现在我的要求是从 json 数据中获取数组中包含的字典中的 "text" 字段,并将所有这些文本推文通过 Kafka Streaming 传递到另一个主题。
我写代码到这里。 请帮我解析数据。
Java 流媒体代码
final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
KStreamBuilder builder = new KStreamBuilder();
KStream<String, JsonNode> personstwitter =builder.stream(Serdes.String(), jsonSerde, "Persons");//taking the json node as input
personstwitter.to(Serdes.String(), jsonSerde,"Persons-output");
我建议您执行以下操作以更好地控制 JSON 数据。
- 写一个
Serializer
和De-Serializer
。 - 基于 JSON 字符串创建一个 POJO。 POJO 是对数据进行更多控制的最佳方式。
- 将数据映射到 POJO 以访问所需数据。
POJO:
@JsonRootName("person")
public class Person implements Serializable {
/**
*
*/
private static final long serialVersionUID = 1L;
private String name;
private String personalID;
private String country;
private String occupation;
public Person() {
}
@JsonCreator
public Person(@JsonProperty("name") String name,@JsonProperty("personalID") String personalID,
@JsonProperty("country") String country,@JsonProperty("occupation") String occupation){
this.name= name;
this.personalID = personalID;
this.country = country;
this.occupation = occupation;
}
//getters and setters stripped
}
序列化器:
public class JsonSerializer<T> implements Serializer<T> {
private ObjectMapper om = new ObjectMapper();
@Override
public void close() {
// TODO Auto-generated method stub
}
@Override
public void configure(Map<String, ?> config, boolean isKey) {
// TODO Auto-generated method stub
}
@Override
public byte[] serialize(String topic, T data) {
byte[] retval = null;
try {
System.out.println(data.getClass());
retval = om.writeValueAsString(data).getBytes();
} catch (JsonProcessingException e) {
throw new SerializationException();
}
return retval;
}
}
解串器:
public class JsonDeserializer<T> implements Deserializer<T> {
private ObjectMapper om = new ObjectMapper();
private Class<T> type;
/*
* Default constructor needed by kafka
*/
public JsonDeserializer() {
}
public JsonDeserializer(Class<T> type) {
this.type = type;
}
@Override
public void close() {
// TODO Auto-generated method stub
}
@SuppressWarnings("unchecked")
@Override
public void configure(Map<String, ?> map, boolean arg1) {
if (type == null) {
type = (Class<T>) map.get("type");
}
}
@Override
public T deserialize(String undefined, byte[] bytes) {
T data = null;
if (bytes == null || bytes.length == 0) {
return null;
}
try {
System.out.println(getType());
data = om.readValue(bytes, type);
} catch (Exception e) {
throw new SerializationException(e);
}
return data;
}
protected Class<T> getType() {
return type;
}
}
消费者:
public class ConsumerUtilities {
public static Properties getProperties() {
Properties configs = new Properties();
configs.put(StreamsConfig.APPLICATION_ID_CONFIG,
"Kafka test application");
configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
return configs;
}
public static KStreamBuilder getStreamingConsumer() {
KStreamBuilder builder = new KStreamBuilder();
return builder;
}
public static void getStreamData() {
JsonSerializer<Person> personJsonSerializer = new JsonSerializer<>();
JsonDeserializer<Person> personJsonDeserializer = new JsonDeserializer<>(
Person.class);
Serde<Person> personSerde = Serdes.serdeFrom(personJsonSerializer,
personJsonDeserializer);
KStreamBuilder builder = getStreamingConsumer();
try {
KStream<String, Person> kStream = builder.stream(Serdes.String(),
personSerde, "test");
kStream.foreach(new ForeachAction<String, Person>() {
@Override
public void apply(String arg0, Person arg1) {
System.out.println(arg1.getCountry());
}
});
} catch (Exception s) {
s.printStackTrace();
}
KafkaStreams kafkaStreams = new KafkaStreams(builder, getProperties());
kafkaStreams.start();
}
}
制作人:
public class ProducerUtilities {
public static org.apache.kafka.clients.producer.Producer<String, Person> getProducer() {
Properties configProperties = new Properties();
configProperties.put(ProducerConfig.CLIENT_ID_CONFIG,
"kafka json producer");
configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer");
configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"com.kafka.api.serdes.JsonSerializer");
org.apache.kafka.clients.producer.Producer<String, Person> producer = new KafkaProducer<String, Person>(
configProperties);
return producer;
}
public ProducerRecord<String, Person> createRecord(Person person) {
ProducerRecord<String, Person> record = new ProducerRecord<String, Person>(
"test", person);
return record;
}
}