Java Kafka 消费者和 avro 解串器
Java Kafka consumer and avro deserialzier
我正在开发一个简单的 java with spark streaming。
我配置了一个 kafka jdbc 连接器(postgres 到主题),我想用 spark streaming 消费者阅读它。
我可以通过以下方式正确阅读主题:
./kafka-avro-console-consumer --bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --property print.key=true --from-beginning --topic postgres-ip_audit
得到这个结果:
null
{"id":1557,"ip":{"string":"90.228.176.138"},"create_ts":{"long":1554819937582}}
当我使用具有此配置的 java 应用程序时:
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "groupStreamId");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
我得到这样的结果:
�179.20.119.53�����Z
谁能告诉我如何解决我的问题?
我也尝试使用 ByteArrayDeserializer 并将 bytes[] 转换为字符串,但我总是得到错误的字符结果。
您提供了一个 StringDeserializer,但是您正在发送使用 avro 序列化的值,因此您需要相应地反序列化它们。使用spark 2.4.0(和下面的deps编译org.apache.spark:spark-avro_2.12:2.4.1
你可以通过使用from_avro
函数来实现它:
import org.apache.spark.sql.avro._
// `from_avro` requires Avro schema in JSON string format.
val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("path/to/your/schema.avsc")))
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
Dataset<Row> output = df
.select(from_avro(col("value"), jsonFormatSchema).as("user"))
.where("user.favorite_color == \"red\"")
.show()
如果您需要使用架构注册表(就像您使用 kafka-avro-console-consumer 所做的那样),则不可能开箱即用,需要编写大量代码。我会推荐使用这个库 https://github.com/AbsaOSS/ABRiS。但是它只与 spark 2.3.0
兼容
您可以使用 io.confluent.kafka.serializers.KafkaAvroDeserializer
反序列化 avro 消息并使用模式注册表来管理记录模式。
这是一个示例代码片段
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import io.confluent.kafka.serializers.KafkaAvroDecoder;
import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;
public class SparkStreaming {
public static void main(String... args) {
SparkConf conf = new SparkConf();
conf.setMaster("local[2]");
conf.setAppName("Spark Streaming Test Java");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(10));
processStream(ssc, sc);
ssc.start();
ssc.awaitTermination();
}
private static void processStream(JavaStreamingContext ssc, JavaSparkContext sc) {
System.out.println("--> Processing stream");
Map<String, String> props = new HashMap<>();
props.put("bootstrap.servers", "localhost:9092");
props.put("schema.registry.url", "http://localhost:8081");
props.put("group.id", "spark");
props.put("specific.avro.reader", "true");
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
Set<String> topicsSet = new HashSet<>(Collections.singletonList("test"));
JavaPairInputDStream<String, Object> stream = KafkaUtils.createDirectStream(ssc, String.class, Object.class,
StringDecoder.class, KafkaAvroDecoder.class, props, topicsSet);
stream.foreachRDD(rdd -> {
rdd.foreachPartition(iterator -> {
while (iterator.hasNext()) {
Tuple2<String, Object> next = iterator.next();
Model model = (Model) next._2();
System.out.println(next._1() + " --> " + model);
}
}
);
});
}
}
Complete sample application is available in this github repo
我正在开发一个简单的 java with spark streaming。
我配置了一个 kafka jdbc 连接器(postgres 到主题),我想用 spark streaming 消费者阅读它。
我可以通过以下方式正确阅读主题:
./kafka-avro-console-consumer --bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --property print.key=true --from-beginning --topic postgres-ip_audit
得到这个结果:
null {"id":1557,"ip":{"string":"90.228.176.138"},"create_ts":{"long":1554819937582}}
当我使用具有此配置的 java 应用程序时:
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "groupStreamId");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
我得到这样的结果:
�179.20.119.53�����Z
谁能告诉我如何解决我的问题?
我也尝试使用 ByteArrayDeserializer 并将 bytes[] 转换为字符串,但我总是得到错误的字符结果。
您提供了一个 StringDeserializer,但是您正在发送使用 avro 序列化的值,因此您需要相应地反序列化它们。使用spark 2.4.0(和下面的deps编译org.apache.spark:spark-avro_2.12:2.4.1
你可以通过使用from_avro
函数来实现它:
import org.apache.spark.sql.avro._
// `from_avro` requires Avro schema in JSON string format.
val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("path/to/your/schema.avsc")))
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
Dataset<Row> output = df
.select(from_avro(col("value"), jsonFormatSchema).as("user"))
.where("user.favorite_color == \"red\"")
.show()
如果您需要使用架构注册表(就像您使用 kafka-avro-console-consumer 所做的那样),则不可能开箱即用,需要编写大量代码。我会推荐使用这个库 https://github.com/AbsaOSS/ABRiS。但是它只与 spark 2.3.0
兼容您可以使用 io.confluent.kafka.serializers.KafkaAvroDeserializer
反序列化 avro 消息并使用模式注册表来管理记录模式。
这是一个示例代码片段
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import io.confluent.kafka.serializers.KafkaAvroDecoder;
import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;
public class SparkStreaming {
public static void main(String... args) {
SparkConf conf = new SparkConf();
conf.setMaster("local[2]");
conf.setAppName("Spark Streaming Test Java");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(10));
processStream(ssc, sc);
ssc.start();
ssc.awaitTermination();
}
private static void processStream(JavaStreamingContext ssc, JavaSparkContext sc) {
System.out.println("--> Processing stream");
Map<String, String> props = new HashMap<>();
props.put("bootstrap.servers", "localhost:9092");
props.put("schema.registry.url", "http://localhost:8081");
props.put("group.id", "spark");
props.put("specific.avro.reader", "true");
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
Set<String> topicsSet = new HashSet<>(Collections.singletonList("test"));
JavaPairInputDStream<String, Object> stream = KafkaUtils.createDirectStream(ssc, String.class, Object.class,
StringDecoder.class, KafkaAvroDecoder.class, props, topicsSet);
stream.foreachRDD(rdd -> {
rdd.foreachPartition(iterator -> {
while (iterator.hasNext()) {
Tuple2<String, Object> next = iterator.next();
Model model = (Model) next._2();
System.out.println(next._1() + " --> " + model);
}
}
);
});
}
}
Complete sample application is available in this github repo